Merge pull request #3718 from etschannen/master

Merge release 6.3 into master
This commit is contained in:
Evan Tschannen 2020-09-01 09:00:34 -07:00 committed by GitHub
commit cf19c666f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 1092 additions and 266 deletions

View File

@ -22,6 +22,8 @@ set(JAVA_BINDING_SRCS
src/main/com/apple/foundationdb/directory/NoSuchDirectoryException.java
src/main/com/apple/foundationdb/directory/package-info.java
src/main/com/apple/foundationdb/directory/PathUtil.java
src/main/com/apple/foundationdb/DirectBufferIterator.java
src/main/com/apple/foundationdb/DirectBufferPool.java
src/main/com/apple/foundationdb/FDB.java
src/main/com/apple/foundationdb/FDBDatabase.java
src/main/com/apple/foundationdb/FDBTransaction.java

View File

@ -305,42 +305,6 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureString
return arr;
}
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getSummary(JNIEnv *jenv, jobject, jlong future) {
if( !future ) {
throwParamNotNull(jenv);
return JNI_NULL;
}
FDBFuture *f = (FDBFuture *)future;
const FDBKeyValue *kvs;
int count;
fdb_bool_t more;
fdb_error_t err = fdb_future_get_keyvalue_array( f, &kvs, &count, &more );
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
return JNI_NULL;
}
jbyteArray lastKey = JNI_NULL;
if(count) {
lastKey = jenv->NewByteArray(kvs[count - 1].key_length);
if( !lastKey ) {
if( !jenv->ExceptionOccurred() )
throwOutOfMem(jenv);
return JNI_NULL;
}
jenv->SetByteArrayRegion(lastKey, 0, kvs[count - 1].key_length, (jbyte *)kvs[count - 1].key);
}
jobject result = jenv->NewObject(range_result_summary_class, range_result_summary_init, lastKey, count, (jboolean)more);
if( jenv->ExceptionOccurred() )
return JNI_NULL;
return result;
}
// SOMEDAY: explore doing this more efficiently with Direct ByteBuffers
JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1get(JNIEnv *jenv, jobject, jlong future) {
if( !future ) {
@ -640,6 +604,68 @@ JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1
return (jlong)f;
}
JNIEXPORT void JNICALL Java_com_apple_foundationdb_FutureResults_FutureResults_1getDirect(
JNIEnv* jenv, jobject, jlong future, jobject jbuffer, jint bufferCapacity) {
if( !future ) {
throwParamNotNull(jenv);
return;
}
uint8_t* buffer = (uint8_t*)jenv->GetDirectBufferAddress(jbuffer);
if (!buffer) {
if (!jenv->ExceptionOccurred())
throwRuntimeEx(jenv, "Error getting handle to native resources");
return;
}
FDBFuture* f = (FDBFuture*)future;
const FDBKeyValue *kvs;
int count;
fdb_bool_t more;
fdb_error_t err = fdb_future_get_keyvalue_array( f, &kvs, &count, &more );
if( err ) {
safeThrow( jenv, getThrowable( jenv, err ) );
return;
}
// Capacity for Metadata+Keys+Values
// => sizeof(jint) for total key/value pairs
// => sizeof(jint) to store more flag
// => sizeof(jint) to store key length per KV pair
// => sizeof(jint) to store value length per KV pair
int totalCapacityNeeded = 2 * sizeof(jint);
for(int i = 0; i < count; i++) {
totalCapacityNeeded += kvs[i].key_length + kvs[i].value_length + 2*sizeof(jint);
if (bufferCapacity < totalCapacityNeeded) {
count = i; /* Only fit first `i` K/V pairs */
more = true;
break;
}
}
int offset = 0;
// First copy RangeResultSummary, i.e. [keyCount, more]
memcpy(buffer + offset, &count, sizeof(jint));
offset += sizeof(jint);
memcpy(buffer + offset, &more, sizeof(jint));
offset += sizeof(jint);
for (int i = 0; i < count; i++) {
memcpy(buffer + offset, &kvs[i].key_length, sizeof(jint));
memcpy(buffer + offset + sizeof(jint), &kvs[i].value_length, sizeof(jint));
offset += 2 * sizeof(jint);
memcpy(buffer + offset, kvs[i].key, kvs[i].key_length);
offset += kvs[i].key_length;
memcpy(buffer + offset, kvs[i].value, kvs[i].value_length);
offset += kvs[i].value_length;
}
}
JNIEXPORT jlong JNICALL Java_com_apple_foundationdb_FDBTransaction_Transaction_1getEstimatedRangeSizeBytes(JNIEnv *jenv, jobject, jlong tPtr,
jbyteArray beginKeyBytes, jbyteArray endKeyBytes) {
if( !tPtr || !beginKeyBytes || !endKeyBytes) {

View File

@ -0,0 +1,108 @@
/*
* DirectBufferIterator.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Holds the direct buffer that is shared with JNI wrapper. A typical usage is as follows:
*
* The serialization format of result is =>
* [int keyCount, boolean more, ListOf<(int keyLen, int valueLen, byte[] key, byte[] value)>]
*/
class DirectBufferIterator implements Iterator<KeyValue>, AutoCloseable {
private ByteBuffer byteBuffer;
private int current = 0;
private int keyCount = -1;
private boolean more = false;
public DirectBufferIterator(ByteBuffer buffer) {
byteBuffer = buffer;
byteBuffer.order(ByteOrder.nativeOrder());
}
@Override
public void close() {
if (byteBuffer != null) {
DirectBufferPool.getInstance().add(byteBuffer);
byteBuffer = null;
}
}
public boolean hasResultReady() {
return keyCount > -1;
}
@Override
public boolean hasNext() {
assert (hasResultReady());
return current < keyCount;
}
@Override
public KeyValue next() {
assert (hasResultReady()); // Must be called once its ready.
if (!hasNext()) {
throw new NoSuchElementException();
}
final int keyLen = byteBuffer.getInt();
final int valueLen = byteBuffer.getInt();
byte[] key = new byte[keyLen];
byteBuffer.get(key);
byte[] value = new byte[valueLen];
byteBuffer.get(value);
current += 1;
return new KeyValue(key, value);
}
public ByteBuffer getBuffer() {
return byteBuffer;
}
public int count() {
assert (hasResultReady());
return keyCount;
}
public boolean hasMore() {
assert (hasResultReady());
return more;
}
public int currentIndex() {
return current;
}
public void readResultsSummary() {
byteBuffer.rewind();
byteBuffer.position(0);
keyCount = byteBuffer.getInt();
more = byteBuffer.getInt() > 0;
}
}

View File

@ -0,0 +1,89 @@
/*
* DirectBufferPool.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
/**
* A singleton that manages a pool of {@link DirectByteBuffer}, that will be
* shared by the {@link DirectBufferIterator} instances. It is responsibilty of
* user to return the borrowed buffers.
*/
class DirectBufferPool {
static final DirectBufferPool __instance = new DirectBufferPool();
// When tuning this, make sure that the size of the buffer,
// is always greater than the maximum size KV allowed by FDB.
// Current limits is :
// 10kB for key + 100kB for value + 1 int for count + 1 int for more + 2 int for KV size
static public final int MIN_BUFFER_SIZE = (10 + 100) * 1000 + Integer.BYTES * 4;
static private final int DEFAULT_NUM_BUFFERS = 128;
static private final int DEFAULT_BUFFER_SIZE = 1024 * 512;
private ArrayBlockingQueue<ByteBuffer> buffers;
private int currentBufferCapacity;
public DirectBufferPool() {
resize(DEFAULT_NUM_BUFFERS, DEFAULT_BUFFER_SIZE);
}
public static DirectBufferPool getInstance() {
return __instance;
}
/**
* Resizes buffer pool with given capacity and buffer size. Throws OutOfMemory exception
* if unable to allocate as asked.
*/
public synchronized void resize(int newPoolSize, int bufferSize) {
if (bufferSize < MIN_BUFFER_SIZE) {
throw new IllegalArgumentException("'bufferSize' must be at-least: " + MIN_BUFFER_SIZE + " bytes");
}
buffers = new ArrayBlockingQueue<>(newPoolSize);
currentBufferCapacity = bufferSize;
while (buffers.size() < newPoolSize) {
ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
buffers.add(buffer);
}
}
/**
* Requests a {@link DirectByteBuffer} from our pool. Returns null if pool is empty.
*/
public synchronized ByteBuffer poll() {
return buffers.poll();
}
/**
* Returns the {@link DirectByteBuffer} that was borrowed from our pool.
*/
public synchronized void add(ByteBuffer buffer) {
if (buffer.capacity() != currentBufferCapacity) {
// This can happen when a resize is called while there are outstanding requests,
// older buffers will be returned eventually.
return;
}
buffers.offer(buffer);
}
}

View File

@ -85,6 +85,8 @@ public class FDB {
private volatile boolean netStarted = false;
private volatile boolean netStopped = false;
volatile boolean warnOnUnclosed = true;
private boolean enableDirectBufferQueries = false;
private boolean useShutdownHook = true;
private Thread shutdownHook;
private final Semaphore netRunning = new Semaphore(1);
@ -229,6 +231,35 @@ public class FDB {
return apiVersion;
}
/**
* Enables or disables use of DirectByteBuffers for getRange() queries.
*
* @param enabled Whether DirectByteBuffer should be used for getRange() queries.
*/
public void enableDirectBufferQuery(boolean enabled) {
enableDirectBufferQueries = enabled;
}
/**
* Determines whether {@code getRange()} queries can use {@code DirectByteBuffer} from
* {@link DirectBufferPool} to copy results.
*
* @return {@code true} if direct buffer queries have been enabled and {@code false} otherwise
*/
public boolean isDirectBufferQueriesEnabled() {
return enableDirectBufferQueries;
}
/**
* Resizes the DirectBufferPool with given parameters, which is used by getRange() requests.
*
* @param poolSize Number of buffers in pool
* @param bufferSize Size of each buffer in bytes
*/
public void resizeDirectBufferPool(int poolSize, int bufferSize) {
DirectBufferPool.getInstance().resize(poolSize, bufferSize);
}
/**
* Connects to the cluster specified by the
* <a href="/foundationdb/administration.html#default-cluster-file" target="_blank">default fdb.cluster file</a>.
@ -507,4 +538,4 @@ public class FDB {
private native boolean Error_predicate(int predicate, int code);
private native long Database_create(String clusterFilePath) throws FDBException;
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.nio.ByteBuffer;
import com.apple.foundationdb.async.AsyncIterable;
import com.apple.foundationdb.async.AsyncUtil;
@ -36,7 +37,6 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
private final TransactionOptions options;
private boolean transactionOwner;
public final ReadTransaction snapshot;
class ReadSnapshot implements ReadTransaction {
@ -369,10 +369,11 @@ class FDBTransaction extends NativeObjectWrapper implements Transaction, OptionC
" -- range get: (%s, %s) limit: %d, bytes: %d, mode: %d, iteration: %d, snap: %s, reverse %s",
begin.toString(), end.toString(), rowLimit, targetBytes, streamingMode,
iteration, Boolean.toString(isSnapshot), Boolean.toString(reverse)));*/
return new FutureResults(Transaction_getRange(
getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
streamingMode, iteration, isSnapshot, reverse), executor);
return new FutureResults(
Transaction_getRange(getPtr(), begin.getKey(), begin.orEqual(), begin.getOffset(),
end.getKey(), end.orEqual(), end.getOffset(), rowLimit, targetBytes,
streamingMode, iteration, isSnapshot, reverse),
FDB.instance().isDirectBufferQueriesEnabled(), executor);
} finally {
pointerReadLock.unlock();
}

View File

@ -20,12 +20,14 @@
package com.apple.foundationdb;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
class FutureResults extends NativeFuture<RangeResultInfo> {
FutureResults(long cPtr, Executor executor) {
FutureResults(long cPtr, boolean enableDirectBufferQueries, Executor executor) {
super(cPtr);
registerMarshalCallback(executor);
this.enableDirectBufferQueries = enableDirectBufferQueries;
}
@Override
@ -44,26 +46,28 @@ class FutureResults extends NativeFuture<RangeResultInfo> {
return new RangeResultInfo(this);
}
public RangeResultSummary getSummary() {
try {
pointerReadLock.lock();
return FutureResults_getSummary(getPtr());
}
finally {
pointerReadLock.unlock();
}
}
public RangeResult getResults() {
ByteBuffer buffer = enableDirectBufferQueries
? DirectBufferPool.getInstance().poll()
: null;
try {
pointerReadLock.lock();
return FutureResults_get(getPtr());
}
finally {
if (buffer != null) {
try (DirectBufferIterator directIterator = new DirectBufferIterator(buffer)) {
FutureResults_getDirect(getPtr(), directIterator.getBuffer(), directIterator.getBuffer().capacity());
return new RangeResult(directIterator);
}
} else {
return FutureResults_get(getPtr());
}
} finally {
pointerReadLock.unlock();
}
}
private native RangeResultSummary FutureResults_getSummary(long ptr) throws FDBException;
private boolean enableDirectBufferQueries = false;
private native RangeResult FutureResults_get(long cPtr) throws FDBException;
private native void FutureResults_getDirect(long cPtr, ByteBuffer buffer, int capacity)
throws FDBException;
}

View File

@ -152,8 +152,6 @@ class RangeQuery implements AsyncIterable<KeyValue> {
@Override
public void accept(RangeResultInfo data, Throwable error) {
try {
final RangeResultSummary summary;
if(error != null) {
promise.completeExceptionally(error);
if(error instanceof Error) {
@ -163,7 +161,8 @@ class RangeQuery implements AsyncIterable<KeyValue> {
return;
}
summary = data.getSummary();
final RangeResult rangeResult = data.get();
final RangeResultSummary summary = rangeResult.getSummary();
if(summary.lastKey == null) {
promise.complete(Boolean.FALSE);
return;
@ -186,11 +185,11 @@ class RangeQuery implements AsyncIterable<KeyValue> {
// If this is the first fetch or the main chunk is exhausted
if(chunk == null || index == chunk.values.size()) {
nextChunk = null;
chunk = data.get();
chunk = rangeResult;
index = 0;
}
else {
nextChunk = data.get();
nextChunk = rangeResult;
}
}

View File

@ -51,4 +51,22 @@ class RangeResult {
}
this.more = more;
}
RangeResult(DirectBufferIterator iterator) {
iterator.readResultsSummary();
more = iterator.hasMore();
int count = iterator.count();
values = new ArrayList<KeyValue>(count);
for (int i = 0; i < count; ++i) {
values.add(iterator.next());
}
}
public RangeResultSummary getSummary() {
final int keyCount = values.size();
final byte[] lastKey = keyCount > 0 ? values.get(keyCount -1).getKey() : null;
return new RangeResultSummary(lastKey, keyCount, more);
}
}

View File

@ -21,10 +21,6 @@
package com.apple.foundationdb;
class RangeResultInfo {
RangeResultSummary getSummary() {
return f.getSummary();
}
RangeResult get() {
return f.getResults();
}

View File

@ -57,8 +57,12 @@ RUN cd /opt/ && curl -L https://github.com/facebook/rocksdb/archive/v6.10.1.tar.
echo "d573d2f15cdda883714f7e0bc87b814a8d4a53a82edde558f08f940e905541ee rocksdb.tar.gz" > rocksdb-sha.txt &&\
sha256sum -c rocksdb-sha.txt && tar xf rocksdb.tar.gz && rm -rf rocksdb.tar.gz rocksdb-sha.txt
LABEL version=0.1.14
ENV DOCKER_IMAGEVER=0.1.14
# Localize time zone
ARG TIMEZONEINFO=America/Los_Angeles
RUN rm -f /etc/localtime && ln -s /usr/share/zoneinfo/${TIMEZONEINFO} /etc/localtime
LABEL version=0.1.15
ENV DOCKER_IMAGEVER=0.1.15
ENV JAVA_HOME=/usr/lib/jvm/java-1.8.0
ENV CC=/opt/rh/devtoolset-8/root/usr/bin/gcc
ENV CXX=/opt/rh/devtoolset-8/root/usr/bin/g++

View File

@ -1,4 +1,4 @@
FROM foundationdb/foundationdb-build:0.1.14
FROM foundationdb/foundationdb-build:0.1.15
USER root
@ -7,7 +7,7 @@ ADD artifacts /mnt/artifacts
# Install build tools for building via make
RUN \
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm cgdb
# Download and install llvm-10.0.0
RUN cd / &&\
@ -17,7 +17,9 @@ RUN cd / &&\
# Download and install gcc-9.3.0
RUN cd / &&\
curl -L $FDB_ARTIFACTSURL/downloads/docker/foundationdb-dev/gcc-9.3.0.tar.gz | tar -xvz
curl -L $FDB_ARTIFACTSURL/downloads/docker/foundationdb-dev/gcc-9.3.0.tar.gz | tar -xvz &&\
mv -iv /usr/local/bin/go /usr/local/bin/go.gcc93 &&\
mv -iv /usr/local/bin/gofmt /usr/local/bin/gofmt.gcc93
# Download and install distcc 3.3.2 new centos binaries
RUN cd / &&\
@ -48,8 +50,8 @@ RUN cp -iv /usr/local/bin/clang++ /usr/local/bin/clang++.deref &&\
ldconfig &&\
rm -rf /mnt/artifacts
LABEL version=0.11.6
ENV DOCKER_IMAGEVER=0.11.6
LABEL version=0.11.8
ENV DOCKER_IMAGEVER=0.11.8
ENV CLANGCC=/usr/local/bin/clang.de8a65ef
ENV CLANGCXX=/usr/local/bin/clang++.de8a65ef

View File

@ -2,7 +2,7 @@ version: "3"
services:
common: &common
image: foundationdb/foundationdb-build:0.1.13
image: foundationdb/foundationdb-build:0.1.15
build-setup: &build-setup
<<: *common

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.3.4.pkg <https://www.foundationdb.org/downloads/6.3.4/macOS/installers/FoundationDB-6.3.4.pkg>`_
* `FoundationDB-6.3.5.pkg <https://www.foundationdb.org/downloads/6.3.5/macOS/installers/FoundationDB-6.3.5.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.3.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.4/ubuntu/installers/foundationdb-clients_6.3.4-1_amd64.deb>`_
* `foundationdb-server-6.3.4-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.4/ubuntu/installers/foundationdb-server_6.3.4-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.3.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.5/ubuntu/installers/foundationdb-clients_6.3.5-1_amd64.deb>`_
* `foundationdb-server-6.3.5-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.5/ubuntu/installers/foundationdb-server_6.3.5-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.3.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.4/rhel6/installers/foundationdb-clients-6.3.4-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.4-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.4/rhel6/installers/foundationdb-server-6.3.4-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel6/installers/foundationdb-clients-6.3.5-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.5-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel6/installers/foundationdb-server-6.3.5-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.3.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.4/rhel7/installers/foundationdb-clients-6.3.4-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.4-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.4/rhel7/installers/foundationdb-server-6.3.4-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel7/installers/foundationdb-clients-6.3.5-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.5-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.5/rhel7/installers/foundationdb-server-6.3.5-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.3.4-x64.msi <https://www.foundationdb.org/downloads/6.3.4/windows/installers/foundationdb-6.3.4-x64.msi>`_
* `foundationdb-6.3.5-x64.msi <https://www.foundationdb.org/downloads/6.3.5/windows/installers/foundationdb-6.3.5-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
* `foundationdb-6.3.4.tar.gz <https://www.foundationdb.org/downloads/6.3.4/bindings/python/foundationdb-6.3.4.tar.gz>`_
* `foundationdb-6.3.5.tar.gz <https://www.foundationdb.org/downloads/6.3.5/bindings/python/foundationdb-6.3.5.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.3.4.gem <https://www.foundationdb.org/downloads/6.3.4/bindings/ruby/fdb-6.3.4.gem>`_
* `fdb-6.3.5.gem <https://www.foundationdb.org/downloads/6.3.5/bindings/ruby/fdb-6.3.5.gem>`_
Java 8+
-------
* `fdb-java-6.3.4.jar <https://www.foundationdb.org/downloads/6.3.4/bindings/java/fdb-java-6.3.4.jar>`_
* `fdb-java-6.3.4-javadoc.jar <https://www.foundationdb.org/downloads/6.3.4/bindings/java/fdb-java-6.3.4-javadoc.jar>`_
* `fdb-java-6.3.5.jar <https://www.foundationdb.org/downloads/6.3.5/bindings/java/fdb-java-6.3.5.jar>`_
* `fdb-java-6.3.5-javadoc.jar <https://www.foundationdb.org/downloads/6.3.5/bindings/java/fdb-java-6.3.5-javadoc.jar>`_
Go 1.11+
--------

View File

@ -2,6 +2,12 @@
Release Notes
#############
6.2.25
======
* Mitigate an issue where a non-lockaware transaction that changes certain ``\xff`` "metadata" keys, committed concurrently with locking the database, can cause corruption. If a non-lockaware transaction manually sets its read version to a version where the database is locked, and changes metadata keys, this can still cause corruption. `(PR #3674) <https://github.com/apple/foundationdb/pull/3674>`_
* Reset network connections between the proxies and satellite tlogs if the latencies are larger than 500ms. `(PR #3686) <https://github.com/apple/foundationdb/pull/3686>`_
6.2.24
======

View File

@ -2,12 +2,6 @@
Release Notes
#############
6.3.5
=====
* Fix an issue where ``fdbcli --exec 'exclude no_wait ...'`` would incorrectly report that processes can safely be removed from the cluster. `(PR #3566) <https://github.com/apple/foundationdb/pull/3566>`_
* When a configuration key is changed, it will always be included in ``status json`` output, even the value is reverted back to the default value. `(PR #3610) <https://github.com/apple/foundationdb/pull/3610>`_
6.3.4
=====
@ -66,6 +60,7 @@ Fixes
* Prevent blob upload timeout if request timeout is lower than expected request time. `(PR #3533) <https://github.com/apple/foundationdb/pull/3533>`_
* In very rare scenarios, the data distributor process would crash when being shutdown. `(PR #3530) <https://github.com/apple/foundationdb/pull/3530>`_
* The master would die immediately if it did not have the correct cluster controller interface when recruited. [6.3.4] `(PR #3537) <https://github.com/apple/foundationdb/pull/3537>`_
* Fix an issue where ``fdbcli --exec 'exclude no_wait ...'`` would incorrectly report that processes can safely be removed from the cluster. [6.3.5] `(PR #3566) <https://github.com/apple/foundationdb/pull/3566>`_
Status
------
@ -74,6 +69,7 @@ Status
* Replaced ``cluster.database_locked`` status field with ``cluster.database_lock_state``, which contains two subfields: ``locked`` (boolean) and ``lock_uid`` (which contains the database lock uid if the database is locked). `(PR #2058) <https://github.com/apple/foundationdb/pull/2058>`_
* Removed fields ``worst_version_lag_storage_server`` and ``limiting_version_lag_storage_server`` from the ``cluster.qos`` section. The ``worst_data_lag_storage_server`` and ``limiting_data_lag_storage_server`` objects can be used instead. `(PR #3196) <https://github.com/apple/foundationdb/pull/3196>`_
* If a process is unable to flush trace logs to disk, the problem will now be reported via the output of ``status`` command inside ``fdbcli``. `(PR #2605) <https://github.com/apple/foundationdb/pull/2605>`_ `(PR #2820) <https://github.com/apple/foundationdb/pull/2820>`_
* When a configuration key is changed, it will always be included in ``status json`` output, even the value is reverted back to the default value. [6.3.5] `(PR #3610) <https://github.com/apple/foundationdb/pull/3610>`_
Bindings
--------
@ -84,6 +80,8 @@ Bindings
* Java: Optimize byte array comparisons in ``ByteArrayUtil``. `(PR #2823) <https://github.com/apple/foundationdb/pull/2823>`_
* Java: Add ``FDB.disableShutdownHook`` that can be used to prevent the default shutdown hook from running. Users of this new function should make sure to call ``stopNetwork`` before terminating a client process. `(PR #2635) <https://github.com/apple/foundationdb/pull/2635>`_
* Java: Introduced ``keyAfter`` utility function that can be used to create the immediate next key for a given byte array. `(PR #2458) <https://github.com/apple/foundationdb/pull/2458>`_
* Java: Combined ``getSummary()`` and ``getResults()`` JNI calls for ``getRange()`` queries. [6.3.5] `(PR #3681) <https://github.com/apple/foundationdb/pull/3681>`_
* Java: Added support to use ``DirectByteBuffers`` in ``getRange()`` requests for better performance, which can be enabled using ``FDB.enableDirectBufferQueries``. [6.3.5] `(PR #3681) <https://github.com/apple/foundationdb/pull/3681>`_
* Golang: The ``Transact`` function will unwrap errors that have been wrapped using ``xerrors`` to determine if a retryable FoundationDB error is in the error chain. `(PR #3131) <https://github.com/apple/foundationdb/pull/3131>`_
* Golang: Added ``Subspace.PackWithVersionstamp`` that can be used to pack a ``Tuple`` that contains a versionstamp. `(PR #2243) <https://github.com/apple/foundationdb/pull/2243>`_
* Golang: Implement ``Stringer`` interface for ``Tuple``, ``Subspace``, ``UUID``, and ``Versionstamp``. `(PR #3032) <https://github.com/apple/foundationdb/pull/3032>`_
@ -116,6 +114,7 @@ Fixes from previous versions
* The 6.3.1 patch release includes all fixes from the patch releases 6.2.21 and 6.2.22. :doc:`(6.2 Release Notes) </release-notes/release-notes-620>`
* The 6.3.3 patch release includes all fixes from the patch release 6.2.23. :doc:`(6.2 Release Notes) </release-notes/release-notes-620>`
* The 6.3.5 patch release includes all fixes from the patch releases 6.2.24 and 6.2.25. :doc:`(6.2 Release Notes) </release-notes/release-notes-620>`
Fixes only impacting 6.3.0+
---------------------------
@ -124,6 +123,7 @@ Fixes only impacting 6.3.0+
* Renamed ``MIN_DELAY_STORAGE_CANDIDACY_SECONDS`` knob to ``MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS``. [6.3.2] `(PR #3327) <https://github.com/apple/foundationdb/pull/3327>`_
* Refreshing TLS certificates could cause crashes. [6.3.2] `(PR #3352) <https://github.com/apple/foundationdb/pull/3352>`_
* All storage class processes attempted to connect to the same coordinator. [6.3.2] `(PR #3361) <https://github.com/apple/foundationdb/pull/3361>`_
* Adjusted the proxy load balancing algorithm to be based on the CPU usage of the process instead of the number of requests processed. [6.3.5] `(PR #3653) <https://github.com/apple/foundationdb/pull/3653>`_
Earlier release notes
---------------------

View File

@ -27,6 +27,7 @@
#include "flow/UnitTest.h"
#include "flow/Hash3.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/Platform.h"
#include "fdbclient/AsyncFileBlobStore.actor.h"
#include "fdbclient/Status.h"
@ -1614,9 +1615,14 @@ public:
std::string fullPath = joinPath(m_path, path);
#ifndef _WIN32
if(g_network->isSimulated()) {
if(!fileExists(fullPath))
if(!fileExists(fullPath)) {
throw file_not_found();
std::string uniquePath = fullPath + "." + deterministicRandom()->randomUniqueID().toString() + ".lnk";
}
if (g_simulator.getCurrentProcess()->uid == UID()) {
TraceEvent(SevError, "BackupContainerReadFileOnUnsetProcessID");
}
std::string uniquePath = fullPath + "." + g_simulator.getCurrentProcess()->uid.toString() + ".lnk";
unlink(uniquePath.c_str());
ASSERT(symlink(basename(path).c_str(), uniquePath.c_str()) == 0);
fullPath = uniquePath;

View File

@ -92,7 +92,7 @@ void ClientKnobs::initialize(bool randomize) {
init( AGGREGATE_HEALTH_METRICS_MAX_STALENESS, 0.5 );
init( DETAILED_HEALTH_METRICS_MAX_STALENESS, 5.0 );
init( MID_SHARD_SIZE_MAX_STALENESS, 10.0 );
init( TAG_ENCODE_KEY_SERVERS, true ); if( randomize && BUGGIFY ) TAG_ENCODE_KEY_SERVERS = false;
init( TAG_ENCODE_KEY_SERVERS, false ); if( randomize && BUGGIFY ) TAG_ENCODE_KEY_SERVERS = true;
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -196,7 +196,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, BasicLoadBalancedReply::recentRequests, version, locked, metadataVersion, tagThrottleInfo, midShardSize);
serializer(ar, BasicLoadBalancedReply::processBusyTime, version, locked, metadataVersion, tagThrottleInfo, midShardSize);
}
};

View File

@ -112,7 +112,7 @@ struct RestoreRoleInterface {
UID id() const { return nodeID; }
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "Role:" << getRoleStr(role) << " interfID:" << nodeID.toString();
return ss.str();
@ -201,7 +201,7 @@ struct RestoreApplierInterface : RestoreRoleInterface {
collectRestoreRoleInterfaces, finishRestore);
}
std::string toString() { return nodeID.toString(); }
std::string toString() const { return nodeID.toString(); }
};
struct RestoreControllerInterface : RestoreRoleInterface {
@ -226,7 +226,7 @@ struct RestoreControllerInterface : RestoreRoleInterface {
serializer(ar, *(RestoreRoleInterface*)this, samples);
}
std::string toString() { return nodeID.toString(); }
std::string toString() const { return nodeID.toString(); }
};
// RestoreAsset uniquely identifies the work unit done by restore roles;
@ -249,29 +249,31 @@ struct RestoreAsset {
Key addPrefix;
Key removePrefix;
int batchIndex; // for progress tracking and performance investigation
RestoreAsset() = default;
// Q: Can we simply use uid for == and use different comparison rule for less than operator.
// The ordering of RestoreAsset may change, will that affect correctness or performance?
bool operator==(const RestoreAsset& r) const {
return beginVersion == r.beginVersion && endVersion == r.endVersion && range == r.range &&
fileIndex == r.fileIndex && partitionId == r.partitionId && filename == r.filename &&
return batchIndex == r.batchIndex && beginVersion == r.beginVersion && endVersion == r.endVersion &&
range == r.range && fileIndex == r.fileIndex && partitionId == r.partitionId && filename == r.filename &&
offset == r.offset && len == r.len && addPrefix == r.addPrefix && removePrefix == r.removePrefix;
}
bool operator!=(const RestoreAsset& r) const {
return !(*this == r);
}
bool operator<(const RestoreAsset& r) const {
return std::make_tuple(fileIndex, filename, offset, len, beginVersion, endVersion, range.begin, range.end,
addPrefix, removePrefix) < std::make_tuple(r.fileIndex, r.filename, r.offset, r.len,
r.beginVersion, r.endVersion, r.range.begin,
r.range.end, r.addPrefix, r.removePrefix);
return std::make_tuple(batchIndex, fileIndex, filename, offset, len, beginVersion, endVersion, range.begin,
range.end, addPrefix, removePrefix) <
std::make_tuple(r.batchIndex, r.fileIndex, r.filename, r.offset, r.len, r.beginVersion, r.endVersion,
r.range.begin, r.range.end, r.addPrefix, r.removePrefix);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, uid, beginVersion, endVersion, range, filename, fileIndex, partitionId, offset, len, addPrefix,
removePrefix);
removePrefix, batchIndex);
}
std::string toString() const {
@ -279,7 +281,8 @@ struct RestoreAsset {
ss << "UID:" << uid.toString() << " begin:" << beginVersion << " end:" << endVersion
<< " range:" << range.toString() << " filename:" << filename << " fileIndex:" << fileIndex
<< " partitionId:" << partitionId << " offset:" << offset << " len:" << len
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString()
<< " BatchIndex:" << batchIndex;
return ss.str();
}
@ -342,7 +345,7 @@ struct LoadingParam {
serializer(ar, isRangeFile, url, rangeVersion, blockSize, asset);
}
std::string toString() {
std::string toString() const {
std::stringstream str;
str << "isRangeFile:" << isRangeFile << " url:" << url.toString()
<< " rangeVersion:" << (rangeVersion.present() ? rangeVersion.get() : -1) << " blockSize:" << blockSize
@ -370,7 +373,7 @@ struct RestoreRecruitRoleReply : TimedRequest {
serializer(ar, id, role, loader, applier);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "roleInterf role:" << getRoleStr(role) << " replyID:" << id.toString();
if (loader.present()) {
@ -402,14 +405,14 @@ struct RestoreRecruitRoleRequest : TimedRequest {
serializer(ar, ci, role, nodeIndex, reply);
}
std::string printable() {
std::string printable() const {
std::stringstream ss;
ss << "RestoreRecruitRoleRequest Role:" << getRoleStr(role) << " NodeIndex:" << nodeIndex
<< " RestoreController:" << ci.id().toString();
return ss.str();
}
std::string toString() { return printable(); }
std::string toString() const { return printable(); }
};
// Static info. across version batches
@ -431,7 +434,7 @@ struct RestoreSysInfoRequest : TimedRequest {
serializer(ar, sysInfo, rangeVersions, reply);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "RestoreSysInfoRequest "
<< "rangeVersions.size:" << rangeVersions.size();
@ -456,7 +459,7 @@ struct RestoreSamplesRequest : TimedRequest {
serializer(ar, id, batchIndex, samples, reply);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "ID:" << id.toString() << " BatchIndex:" << batchIndex << " samples:" << samples.size();
return ss.str();
@ -477,7 +480,7 @@ struct RestoreLoadFileReply : TimedRequest {
serializer(ar, param, isDuplicated);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "LoadingParam:" << param.toString() << " isDuplicated:" << isDuplicated;
return ss.str();
@ -496,12 +499,14 @@ struct RestoreLoadFileRequest : TimedRequest {
RestoreLoadFileRequest() = default;
explicit RestoreLoadFileRequest(int batchIndex, LoadingParam& param) : batchIndex(batchIndex), param(param){};
bool operator<(RestoreLoadFileRequest const& rhs) const { return batchIndex > rhs.batchIndex; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, batchIndex, param, reply);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "RestoreLoadFileRequest batchIndex:" << batchIndex << " param:" << param.toString();
return ss.str();
@ -521,12 +526,14 @@ struct RestoreSendMutationsToAppliersRequest : TimedRequest {
explicit RestoreSendMutationsToAppliersRequest(int batchIndex, std::map<Key, UID> rangeToApplier, bool useRangeFile)
: batchIndex(batchIndex), rangeToApplier(rangeToApplier), useRangeFile(useRangeFile) {}
bool operator<(RestoreSendMutationsToAppliersRequest const& rhs) const { return batchIndex > rhs.batchIndex; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, batchIndex, rangeToApplier, useRangeFile, reply);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "RestoreSendMutationsToAppliersRequest batchIndex:" << batchIndex
<< " keyToAppliers.size:" << rangeToApplier.size() << " useRangeFile:" << useRangeFile;
@ -552,10 +559,10 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
: batchIndex(batchIndex), asset(asset), msgIndex(msgIndex), isRangeFile(isRangeFile),
versionedMutations(versionedMutations) {}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "VersionBatchIndex:" << batchIndex << "RestoreAsset:" << asset.toString() << " msgIndex:" << msgIndex
<< " isRangeFile:" << isRangeFile << " versionedMutations.size:" << versionedMutations.size();
ss << "VersionBatchIndex:" << batchIndex << " msgIndex:" << msgIndex << " isRangeFile:" << isRangeFile
<< " versionedMutations.size:" << versionedMutations.size() << " RestoreAsset:" << asset.toString();
return ss.str();
}
@ -580,7 +587,7 @@ struct RestoreVersionBatchRequest : TimedRequest {
serializer(ar, batchIndex, reply);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "RestoreVersionBatchRequest batchIndex:" << batchIndex;
return ss.str();
@ -602,7 +609,7 @@ struct RestoreFinishRequest : TimedRequest {
serializer(ar, terminate, reply);
}
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "RestoreFinishRequest terminate:" << terminate;
return ss.str();

View File

@ -906,6 +906,7 @@ std::pair<MetricNameRef, KeyRef> decodeMetricConfKey( KeyRef const& prefix, KeyR
const KeyRef maxUIDKey = LiteralStringRef("\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff");
const KeyRef databaseLockedKey = LiteralStringRef("\xff/dbLocked");
const KeyRef databaseLockedKeyEnd = LiteralStringRef("\xff/dbLocked\x00");
const KeyRef metadataVersionKey = LiteralStringRef("\xff/metadataVersion");
const KeyRef metadataVersionKeyEnd = LiteralStringRef("\xff/metadataVersion\x00");
const KeyRef metadataVersionRequiredValue = LiteralStringRef("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00");

View File

@ -351,6 +351,7 @@ extern const KeyRef metricConfPrefix;
extern const KeyRef maxUIDKey;
extern const KeyRef databaseLockedKey;
extern const KeyRef databaseLockedKeyEnd;
extern const KeyRef metadataVersionKey;
extern const KeyRef metadataVersionKeyEnd;
extern const KeyRef metadataVersionRequiredValue;

View File

@ -551,6 +551,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
}
} else {
self->outgoingConnectionIdle = false;
self->lastConnectTime = now();
}
firstConnFailedTime.reset();
@ -724,7 +725,7 @@ void Peer::onIncomingConnection( Reference<Peer> self, Reference<IConnection> co
compatibleAddr = transport->localAddresses.secondaryAddress.get();
}
if ( !destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr ) {
if ( !destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr || (lastConnectTime > 1.0 && now() - lastConnectTime > FLOW_KNOBS->ALWAYS_ACCEPT_DELAY) ) {
// Keep the new connection
TraceEvent("IncomingConnection", conn->getDebugID())
.suppressFor(1.0)

View File

@ -458,8 +458,8 @@ Future< REPLY_TYPE(Request) > loadBalance(
// Subclasses must initialize all members in their default constructors
// Subclasses must serialize all members
struct BasicLoadBalancedReply {
int recentRequests;
BasicLoadBalancedReply() : recentRequests(0) {}
int processBusyTime;
BasicLoadBalancedReply() : processBusyTime(0) {}
};
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const BasicLoadBalancedReply *reply);
@ -528,7 +528,7 @@ Future< REPLY_TYPE(Request) > basicLoadBalance(
if(result.present()) {
Optional<BasicLoadBalancedReply> loadBalancedReply = getBasicLoadBalancedReply(&result.get());
if(loadBalancedReply.present()) {
alternatives->updateRecent( useAlt, loadBalancedReply.get().recentRequests );
alternatives->updateRecent( useAlt, loadBalancedReply.get().processBusyTime );
}
return result.get();

View File

@ -67,10 +67,10 @@ struct AlternativeInfo {
T interf;
double probability;
double cumulativeProbability;
int recentRequests;
int processBusyTime;
double lastUpdate;
AlternativeInfo(T const& interf, double probability, double cumulativeProbability) : interf(interf), probability(probability), cumulativeProbability(cumulativeProbability), recentRequests(-1), lastUpdate(0) {}
AlternativeInfo(T const& interf, double probability, double cumulativeProbability) : interf(interf), probability(probability), cumulativeProbability(cumulativeProbability), processBusyTime(-1), lastUpdate(0) {}
bool operator < (double const& r) const {
return cumulativeProbability < r;
@ -105,26 +105,28 @@ public:
return std::lower_bound( alternatives.begin(), alternatives.end(), deterministicRandom()->random01() ) - alternatives.begin();
}
void updateRecent( int index, int recentRequests ) {
alternatives[index].recentRequests = recentRequests;
void updateRecent( int index, int processBusyTime ) {
alternatives[index].processBusyTime = processBusyTime;
alternatives[index].lastUpdate = now();
}
void updateProbabilities() {
double totalRequests = 0;
double totalBusyTime = 0;
for(auto& it : alternatives) {
totalRequests += it.recentRequests;
totalBusyTime += it.processBusyTime;
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
return;
}
}
if(totalRequests < 1000) {
//Do not update probabilities if the average proxy busyness is less than 5%
if(totalBusyTime < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_AMOUNT*alternatives.size()) {
return;
}
double totalProbability = 0;
for(auto& it : alternatives) {
it.probability += (1.0/alternatives.size()-(it.recentRequests/totalRequests))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
it.probability += (1.0/alternatives.size()-(it.processBusyTime/totalBusyTime))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
totalProbability += it.probability;

View File

@ -66,12 +66,16 @@ public:
uint64_t fault_injection_r;
double fault_injection_p1, fault_injection_p2;
UID uid;
ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddressList addresses,
INetworkConnections* net, const char* dataFolder, const char* coordinationFolder)
: name(name), locality(locality), startingClass(startingClass), addresses(addresses),
address(addresses.address), dataFolder(dataFolder), network(net), coordinationFolder(coordinationFolder),
failed(false), excluded(false), rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
fault_injection_r(0), machine(0), cleared(false) {}
fault_injection_r(0), machine(0), cleared(false) {
uid = deterministicRandom()->randomUniqueID();
}
Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }

View File

@ -157,6 +157,7 @@ set(FDBSERVER_SRCS
workloads/KVStoreTest.actor.cpp
workloads/KillRegion.actor.cpp
workloads/LockDatabase.actor.cpp
workloads/LockDatabaseFrequently.actor.cpp
workloads/LocalRatekeeper.actor.cpp
workloads/LogMetrics.actor.cpp
workloads/LowLatency.actor.cpp

View File

@ -2773,10 +2773,7 @@ ACTOR Future<Void> dbInfoUpdater( ClusterControllerData* self ) {
TraceEvent("DBInfoStartBroadcast", self->id);
choose {
when(std::vector<Endpoint> notUpdated = wait( broadcastDBInfoRequest(req, SERVER_KNOBS->DBINFO_SEND_AMOUNT, Optional<Endpoint>(), false) )) {
TraceEvent("DBInfoFinishBroadcast", self->id);
for(auto &it : notUpdated) {
TraceEvent("DBInfoNotUpdated", self->id).detail("Addr", it.getPrimaryAddress());
}
TraceEvent("DBInfoFinishBroadcast", self->id).detail("NotUpdated", notUpdated.size());
if(notUpdated.size()) {
self->updateDBInfoEndpoints.insert(notUpdated.begin(), notUpdated.end());
self->updateDBInfo.trigger();

View File

@ -884,8 +884,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// If unhealthy team is majority, we may not find an ok dest in this while loop
Reference<IDataDistributionTeam> dest = deterministicRandom()->randomChoice(self->teams);
bool ok = dest->isHealthy() &&
(!req.preferLowerUtilization || dest->hasHealthyAvailableSpace(self->medianAvailableSpace));
bool ok = dest->isHealthy() && (!req.preferLowerUtilization ||
dest->hasHealthyAvailableSpace(self->medianAvailableSpace));
for(int i=0; ok && i<randomTeams.size(); i++) {
if (randomTeams[i]->getServerIDs() == dest->getServerIDs()) {

View File

@ -94,8 +94,13 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0;
init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true;
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_SLOW_AMOUNT, 0 );
init( PEEK_STATS_SLOW_AMOUNT, 2 );
init( PEEK_STATS_SLOW_RATIO, 0.5 );
init( PUSH_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PUSH_RESET_INTERVAL = 20.0;
init( PUSH_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PUSH_MAX_LATENCY = 0.0;
init( PUSH_STATS_INTERVAL, 10.0 );
init( PUSH_STATS_SLOW_AMOUNT, 2 );
init( PUSH_STATS_SLOW_RATIO, 0.5 );
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
@ -529,7 +534,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( BYTES_READ_UNITS_PER_SAMPLE, 100000 ); // 100K bytes
init( READ_HOT_SUB_RANGE_CHUNK_SIZE, 10000000); // 10MB
init( EMPTY_READ_PENALTY, 20 ); // 20 bytes
init( READ_SAMPLING_ENABLED, true ); if ( randomize && BUGGIFY ) READ_SAMPLING_ENABLED = false;// enable/disable read sampling
init( READ_SAMPLING_ENABLED, false ); if ( randomize && BUGGIFY ) READ_SAMPLING_ENABLED = true;// enable/disable read sampling
//Storage Server
init( STORAGE_LOGGING_DELAY, 5.0 );
@ -612,13 +617,13 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
init( FASTRESTORE_SAMPLING_PERCENT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; } // too small value may increase chance of TooManyFile error
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() < 0.2 ? 5 * 1024 : deterministicRandom()->random01() < 0.4 ? 100 * 1024 * 1024 : deterministicRandom()->random01() * 1000.0 * 1024.0 * 1024.0; } // too small value may increase chance of TooManyFile error
init( FASTRESTORE_VB_PARALLELISM, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() < 0.2 ? 2 : deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 30 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 1.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() < 0.2 ? 0.1 : deterministicRandom()->random01() * 10.0 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 1 ); if( randomize && BUGGIFY ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
@ -632,7 +637,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_HEARTBEAT_DELAY, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_HEARTBEAT_DELAY = deterministicRandom()->random01() * 120 + 2; }
init( FASTRESTORE_HEARTBEAT_MAX_DELAY, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_HEARTBEAT_MAX_DELAY = FASTRESTORE_HEARTBEAT_DELAY * 10; }
init( FASTRESTORE_APPLIER_FETCH_KEYS_SIZE, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLIER_FETCH_KEYS_SIZE = deterministicRandom()->random01() * 10240 + 1; }
init( FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES, 1.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 + 1; }
init( FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES, 1.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES = deterministicRandom()->random01() < 0.2 ? 1024 : deterministicRandom()->random01() * 5.0 * 1024.0 * 1024.0 + 1; }
init( FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE, false ); if( randomize && BUGGIFY ) { FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE = deterministicRandom()->random01() < 0.5 ? true : false; }
init( FASTRESTORE_REQBATCH_PARALLEL, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_REQBATCH_PARALLEL = deterministicRandom()->random01() * 100 + 1; }
init( FASTRESTORE_REQBATCH_LOG, false ); if( randomize && BUGGIFY ) { FASTRESTORE_REQBATCH_LOG = deterministicRandom()->random01() < 0.2 ? true : false; }
@ -643,6 +648,17 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_USE_RANGE_FILE, true ); // Perf test only: set it to false will cause simulation failure
init( FASTRESTORE_USE_LOG_FILE, true ); // Perf test only: set it to false will cause simulation failure
init( FASTRESTORE_SAMPLE_MSG_BYTES, 1048576 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLE_MSG_BYTES = deterministicRandom()->random01() * 2048;}
init( FASTRESTORE_SCHED_UPDATE_DELAY, 0.1 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_UPDATE_DELAY = deterministicRandom()->random01() * 2;}
init( FASTRESTORE_SCHED_TARGET_CPU_PERCENT, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_TARGET_CPU_PERCENT = deterministicRandom()->random01() * 100 + 50;} // simulate cpu usage can be larger than 100
init( FASTRESTORE_SCHED_MAX_CPU_PERCENT, 90 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_MAX_CPU_PERCENT = FASTRESTORE_SCHED_TARGET_CPU_PERCENT + deterministicRandom()->random01() * 100;}
init( FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 30 + 1;}
init( FASTRESTORE_SCHED_INFLIGHT_SEND_REQS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SEND_REQS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10 + 1;}
init( FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 10 + 1;}
init( FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;}
init( FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH, 2 ); if( randomize && BUGGIFY ) { FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 15 + 1;}
init( FASTRESTORE_NUM_TRACE_EVENTS, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_TRACE_EVENTS = deterministicRandom()->random01() < 0.2 ? 1 : deterministicRandom()->random01() * 500 + 1;}
init( FASTRESTORE_EXPENSIVE_VALIDATION, false ); if( randomize && BUGGIFY ) { FASTRESTORE_EXPENSIVE_VALIDATION = deterministicRandom()->random01() < 0.5 ? true : false;}
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );

View File

@ -94,6 +94,11 @@ public:
double PEEK_STATS_INTERVAL;
double PEEK_STATS_SLOW_AMOUNT;
double PEEK_STATS_SLOW_RATIO;
double PUSH_RESET_INTERVAL;
double PUSH_MAX_LATENCY;
double PUSH_STATS_INTERVAL;
double PUSH_STATS_SLOW_AMOUNT;
double PUSH_STATS_SLOW_RATIO;
// Data distribution queue
double HEALTH_POLL_TIME;
@ -550,7 +555,7 @@ public:
// FASTRESTORE_VB_PARALLELISM is the number of concurrently running version batches
int64_t FASTRESTORE_VB_PARALLELISM;
int64_t FASTRESTORE_VB_MONITOR_DELAY; // How quickly monitor finished version batch
int64_t FASTRESTORE_VB_LAUNCH_DELAY;
double FASTRESTORE_VB_LAUNCH_DELAY;
int64_t FASTRESTORE_ROLE_LOGGING_DELAY;
int64_t FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL; // How quickly to update process metrics for restore
int64_t FASTRESTORE_ATOMICOP_WEIGHT; // workload amplication factor for atomic op
@ -575,6 +580,16 @@ public:
bool FASTRESTORE_USE_RANGE_FILE; // use range file in backup
bool FASTRESTORE_USE_LOG_FILE; // use log file in backup
int64_t FASTRESTORE_SAMPLE_MSG_BYTES; // sample message desired size
double FASTRESTORE_SCHED_UPDATE_DELAY; // delay in seconds in updating process metrics
int FASTRESTORE_SCHED_TARGET_CPU_PERCENT; // release as many requests as possible when cpu usage is below the knob
int FASTRESTORE_SCHED_MAX_CPU_PERCENT; // max cpu percent when scheduler shall not release non-urgent requests
int FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS; // number of inflight requests to load backup files
int FASTRESTORE_SCHED_INFLIGHT_SEND_REQS; // number of inflight requests for loaders to send mutations to appliers
int FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE; // number of load request to release at once
int FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD; // we can send future VB requests if it is less than this knob
int FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH; // number of future VB sendLoadingParam requests to process at once
int FASTRESTORE_NUM_TRACE_EVENTS;
bool FASTRESTORE_EXPENSIVE_VALIDATION; // when set true, performance will be heavily affected
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.

View File

@ -37,12 +37,22 @@ struct DBCoreState;
struct TLogSet;
struct CoreTLogSet;
struct ConnectionResetInfo : public ReferenceCounted<ConnectionResetInfo> {
double lastReset;
Future<Void> resetCheck;
int slowReplies;
int fastReplies;
ConnectionResetInfo() : lastReset(now()), slowReplies(0), fastReplies(0), resetCheck(Void()) {}
};
// The set of tLog servers, logRouters and backupWorkers for a log tag
class LogSet : NonCopyable, public ReferenceCounted<LogSet> {
public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logServers;
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
std::vector< LocalityData > tLogLocalities; // Stores the localities of the log servers

View File

@ -141,7 +141,7 @@ ACTOR Future<Void> resetChecker( ILogSystem::ServerPeekCursor* self, NetworkAddr
self->unknownReplies = 0;
self->fastReplies = 0;
wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL));
TraceEvent("SlowPeekStats").detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies).detail("UnknownReplies", self->unknownReplies);
TraceEvent("SlowPeekStats").detail("PeerAddress", addr).detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies).detail("UnknownReplies", self->unknownReplies);
if(self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT && self->slowReplies/double(self->slowReplies+self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) {
FlowTransport::transport().resetConnection(addr);
self->lastReset = now();

View File

@ -241,7 +241,6 @@ ACTOR Future<Void> queueTransactionStartRequests(
loop choose{
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
stats->addRequest();
if( stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ) {
++stats->txnRequestErrors;
//FIXME: send an error instead of giving an unreadable version when the client can support the error: req.reply.sendError(proxy_memory_limit_exceeded());
@ -340,7 +339,8 @@ struct ResolutionRequestBuilder {
return *out;
}
void addTransaction(CommitTransactionRef& trIn, int transactionNumberInBatch) {
void addTransaction(CommitTransactionRequest& trRequest, int transactionNumberInBatch) {
auto& trIn = trRequest.transaction;
// SOMEDAY: There are a couple of unnecessary O( # resolvers ) steps here
outTr.assign(requests.size(), NULL);
ASSERT( transactionNumberInBatch >= 0 && transactionNumberInBatch < 32768 );
@ -358,6 +358,13 @@ struct ResolutionRequestBuilder {
getOutTransaction(0, trIn.read_snapshot).mutations.push_back(requests[0].arena, m);
}
}
if (isTXNStateTransaction && !trRequest.isLockAware()) {
// This mitigates https://github.com/apple/foundationdb/issues/3647. Since this transaction is not lock
// aware, if this transaction got a read version then \xff/dbLocked must not have been set at this
// transaction's read snapshot. If that changes by commit time, then it won't commit on any proxy because of
// a conflict. A client could set a read version manually so this isn't totally bulletproof.
trIn.read_conflict_ranges.push_back(trRequest.arena, KeyRangeRef(databaseLockedKey, databaseLockedKeyEnd));
}
std::vector<std::vector<int>> rCRIndexMap(
requests.size()); // [resolver_index][read_conflict_range_index_on_the_resolver]
// -> read_conflict_range's original index
@ -469,7 +476,6 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
choose{
when(CommitTransactionRequest req = waitNext(in)) {
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
commitData->stats.addRequest();
int bytes = getBytes(req);
// Drop requests if memory is under severe pressure
@ -862,7 +868,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
int conflictRangeCount = 0;
self->maxTransactionBytes = 0;
for (int t = 0; t < trs.size(); t++) {
requests.addTransaction(trs[t].transaction, t);
requests.addTransaction(trs[t], t);
conflictRangeCount +=
trs[t].transaction.read_conflict_ranges.size() + trs[t].transaction.write_conflict_ranges.size();
//TraceEvent("MPTransactionDump", self->dbgid).detail("Snapshot", trs[t].transaction.read_snapshot);
@ -1580,7 +1586,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, Pro
if (replyFromMaster.version > rep.version) {
rep = replyFromMaster;
}
rep.recentRequests = commitData->stats.getRecentRequests();
rep.processBusyTime = 1e6 * (g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
@ -1858,7 +1864,6 @@ ACTOR static Future<Void> readRequestServer( MasterProxyInterface proxy, Promise
loop {
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
commitData->stats.addRequest();
if(req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && //Always do data distribution requests
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() > SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
++commitData->stats.keyServerLocationErrors;

View File

@ -1300,6 +1300,11 @@ namespace oldTLog_4_6 {
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.recoveryFinished );
DUMPTOKEN( recruited.disablePopRequest );
DUMPTOKEN( recruited.enablePopRequest );
DUMPTOKEN( recruited.snapRequest );
logData = Reference<LogData>( new LogData(self, recruited) );
logData->stopped = true;

View File

@ -2136,6 +2136,11 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.recoveryFinished );
DUMPTOKEN( recruited.disablePopRequest );
DUMPTOKEN( recruited.enablePopRequest );
DUMPTOKEN( recruited.snapRequest );
//We do not need the remoteTag, because we will not be loading any additional data
logData = Reference<LogData>( new LogData(self, recruited, Tag(), true, id_logRouterTags[id1], id_txsTags[id1], UID(), std::vector<Tag>(), "Restored") );
@ -2303,6 +2308,11 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.recoveryFinished );
DUMPTOKEN( recruited.disablePopRequest );
DUMPTOKEN( recruited.enablePopRequest );
DUMPTOKEN( recruited.snapRequest );
for(auto it : self->id_data) {
if( !it.second->stopped ) {

View File

@ -74,36 +74,9 @@ struct ProxyStats {
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while (now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest() {
updateRequestBuckets();
++recentRequests;
++requestBuckets.back();
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests * FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE /
(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE - (lastBucketBegin + bucketInterval - now()));
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
: cc("ProxyStats", id.toString()),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc),
txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
@ -125,17 +98,14 @@ struct ProxyStats {
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
commitLatencyBands("CommitLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount",
[commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for (int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};

View File

@ -77,6 +77,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
}
when(RestoreFinishRequest req = waitNext(applierInterf.finishRestore.getFuture())) {
requestTypeStr = "finishRestore";
actors.clear(false); // cancel all pending actors
handleFinishRestoreRequest(req, self);
if (req.terminate) {
exitRole = Void();
@ -88,6 +89,7 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
break;
}
}
TraceEvent("RestoreApplierCore", self->id()).detail("Request", requestTypeStr); // For debug only
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestoreApplierError", self->id())
.detail("RequestType", requestTypeStr)
@ -108,12 +110,23 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
// Only one actor can process mutations from the same file.
ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req,
Reference<RestoreApplierData> self) {
state Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
state Reference<ApplierBatchData> batchData; // initialized as nullptr
state bool printTrace = false;
state NotifiedVersion* curMsgIndex = nullptr;
if (req.batchIndex <= self->finishedBatch.get()) { // Handle duplicate request from batchIndex that has finished
TraceEvent(SevWarn, "FastRestoreApplierRestoreSendVersionedMutationsRequestTooLate")
.detail("RequestBatchIndex", req.batchIndex)
.detail("FinishedBatchIndex", self->finishedBatch.get());
req.reply.send(RestoreCommonReply(self->id(), true));
ASSERT_WE_THINK(false); // Test to see if simulation can reproduce this
return Void();
}
batchData = self->batch[req.batchIndex];
ASSERT(batchData.isValid());
ASSERT(self->finishedBatch.get() < req.batchIndex);
// wait(delay(0.0, TaskPriority::RestoreApplierReceiveMutations)); // This hurts performance from 100MB/s to 60MB/s
// on circus
@ -121,11 +134,11 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Trace when the receive phase starts at a VB and when it finishes.
// This can help check if receiveMutations block applyMutation phase.
// If so, we need more sophisticated scheduler to ensure priority execution
printTrace = (batchData->receiveMutationReqs % 100 == 1);
printTrace = (batchData->receiveMutationReqs % SERVER_KNOBS->FASTRESTORE_NUM_TRACE_EVENTS == 0);
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("RestoreAssetMesssageIndex", curMsgIndex.get())
.detail("RestoreAssetMesssageIndex", batchData->processedFileState[req.asset].get())
.detail("Request", req.toString())
.detail("CurrentMemory", getSystemStatistics().processMemory)
.detail("PreviousVersionBatchState", batchData->vbState.get())
@ -133,11 +146,16 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
wait(isSchedulable(self, req.batchIndex, __FUNCTION__));
wait(curMsgIndex.whenAtLeast(req.msgIndex - 1));
ASSERT(batchData.isValid());
ASSERT(req.batchIndex > self->finishedBatch.get());
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
// Note: Insert new items into processedFileState will not invalidate the reference.
curMsgIndex = &batchData->processedFileState[req.asset];
wait(curMsgIndex->whenAtLeast(req.msgIndex - 1));
batchData->vbState = ApplierVersionBatchState::RECEIVE_MUTATIONS;
state bool isDuplicated = true;
if (curMsgIndex.get() == req.msgIndex - 1) {
if (curMsgIndex->get() == req.msgIndex - 1) {
isDuplicated = false;
for (int mIndex = 0; mIndex < req.versionedMutations.size(); mIndex++) {
@ -165,14 +183,14 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
ASSERT(versionedMutation.mutation.type != MutationRef::SetVersionstampedKey &&
versionedMutation.mutation.type != MutationRef::SetVersionstampedValue);
}
curMsgIndex.set(req.msgIndex);
curMsgIndex->set(req.msgIndex);
}
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedMessageIndex", curMsgIndex.get())
.detail("ProcessedMessageIndex", curMsgIndex->get())
.detail("Request", req.toString());
return Void();
}
@ -398,6 +416,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
int numKeysInBatch = 0;
int numGetTxns = 0;
double delayTime = 0; // Start transactions at different time to avoid overwhelming FDB.
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
if (!stagingKeyIter->second.hasBaseValue()) {
@ -407,12 +426,14 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
batchIndex, &batchData->counters));
numGetTxns++;
delayTime += SERVER_KNOBS->FASTRESTORE_TXN_EXTRA_DELAY;
numKeysInBatch = 0;
incompleteStagingKeys.clear();
}
}
if (numKeysInBatch > 0) {
numGetTxns++;
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID,
batchIndex, &batchData->counters));
}
@ -420,7 +441,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Step", "Compute the other staging keys")
.detail("StagingKeys", batchData->stagingKeys.size());
.detail("StagingKeys", batchData->stagingKeys.size())
.detail("GetStagingKeyBatchTxns", numGetTxns);
// Pre-compute pendingMutations to other keys in stagingKeys that has base value
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
stagingKeyIter++) {
@ -577,8 +599,10 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
wait(self->finishedBatch.whenAtLeast(req.batchIndex - 1));
state bool isDuplicated = true;
if (self->finishedBatch.get() == req.batchIndex - 1) {
if (self->finishedBatch.get() ==
req.batchIndex - 1) { // duplicate request from earlier version batch will be ignored
Reference<ApplierBatchData> batchData = self->batch[req.batchIndex];
ASSERT(batchData.isValid());
TraceEvent("FastRestoreApplierPhaseHandleApplyToDBRunning", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("FinishedBatch", self->finishedBatch.get())
@ -604,7 +628,7 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
// Avoid setting finishedBatch when finishedBatch > req.batchIndex
if (self->finishedBatch.get() == req.batchIndex - 1) {
self->finishedBatch.set(req.batchIndex);
self->batch[req.batchIndex]->vbState = ApplierVersionBatchState::DONE;
// self->batch[req.batchIndex]->vbState = ApplierVersionBatchState::DONE;
// Free memory for the version batch
self->batch.erase(req.batchIndex);
if (self->delayedActors > 0) {

View File

@ -115,7 +115,7 @@ struct StagingKey {
// Precompute the final value of the key.
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
void precomputeResult(const char* context, UID applierID, int batchIndex) {
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult", applierID)
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPrecomputeResult", applierID)
.detail("BatchIndex", batchIndex)
.detail("Context", context)
.detail("Version", version.toString())

View File

@ -84,6 +84,7 @@ ACTOR Future<Void> sampleBackups(Reference<RestoreControllerData> self, RestoreC
ASSERT(req.batchIndex <= self->batch.size()); // batchIndex starts from 1
Reference<ControllerBatchData> batch = self->batch[req.batchIndex];
ASSERT(batch.isValid());
if (batch->sampleMsgs.find(req.id) != batch->sampleMsgs.end()) {
req.reply.send(RestoreCommonReply(req.id));
continue;
@ -164,7 +165,10 @@ ACTOR Future<Void> recruitRestoreRoles(Reference<RestoreWorkerData> controllerWo
break;
}
TraceEvent("FastRestoreController", controllerData->id()).detail("WorkerNode", workerInterf.first);
TraceEvent("FastRestoreController", controllerData->id())
.detail("WorkerNode", workerInterf.first)
.detail("NodeRole", role)
.detail("NodeIndex", nodeIndex);
requests.emplace_back(workerInterf.first,
RestoreRecruitRoleRequest(controllerWorker->controllerInterf.get(), role, nodeIndex));
nodeIndex++;

View File

@ -221,6 +221,7 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
}
TraceEvent("FastRestoreVersionBatchesSummary")
.detail("VersionBatches", versionBatches.size())
.detail("LogFiles", logFiles)
.detail("RangeFiles", rangeFiles)
.detail("LogBytes", logSize)
@ -314,6 +315,7 @@ struct RestoreControllerData : RestoreRoleData, public ReferenceCounted<RestoreC
}
}
} else {
// TODO: Check why this may happen?!
TraceEvent(SevError, "FastRestoreBuildVersionBatch")
.detail("RangeIndex", rangeIdx)
.detail("RangeFiles", rangeFiles.size())

View File

@ -50,9 +50,11 @@ void handleRestoreSysInfoRequest(const RestoreSysInfoRequest& req, Reference<Res
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self);
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self);
ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int batchIndex, RestoreAsset asset,
bool isRangeFile, std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces);
ACTOR Future<Void> sendMutationsToApplier(
std::priority_queue<RestoreLoaderSchedSendLoadParamRequest>* sendLoadParamQueue,
std::map<int, int>* inflightSendLoadParamReqs, NotifiedVersion* finishedBatch, VersionedMutationsMap* pkvOps,
int batchIndex, RestoreAsset asset, bool isRangeFile, std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
Reference<IBackupContainer> bc, RestoreAsset asset);
@ -62,16 +64,169 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
Reference<IBackupContainer> bc, Version version, RestoreAsset asset);
ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest req, Reference<RestoreLoaderData> self);
// Dispatch requests based on node's business (i.e, cpu usage for now) and requests' priorities
// Requests for earlier version batches are preferred; which is equivalent to
// sendMuttionsRequests are preferred than loadingFileRequests
ACTOR Future<Void> dispatchRequests(Reference<RestoreLoaderData> self) {
try {
state int curVBInflightReqs = 0;
state int sendLoadParams = 0;
state int lastLoadReqs = 0;
loop {
TraceEvent(SevDebug, "FastRestoreLoaderDispatchRequests", self->id())
.detail("SendingQueue", self->sendingQueue.size())
.detail("LoadingQueue", self->loadingQueue.size())
.detail("SendingLoadParamQueue", self->sendLoadParamQueue.size())
.detail("InflightSendingReqs", self->inflightSendingReqs)
.detail("InflightSendingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS)
.detail("InflightLoadingReqs", self->inflightLoadingReqs)
.detail("InflightLoadingReqsThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS)
.detail("LastLoadFileRequests", lastLoadReqs)
.detail("LoadFileRequestsBatchThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE)
.detail("LastDispatchSendLoadParamReqsForCurrentVB", curVBInflightReqs)
.detail("LastDispatchSendLoadParamReqsForFutureVB", sendLoadParams)
.detail("CpuUsage", self->cpuUsage)
.detail("TargetCpuUsage", SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT)
.detail("MaxCpuUsage", SERVER_KNOBS->FASTRESTORE_SCHED_MAX_CPU_PERCENT);
// TODO: Pop old requests whose version batch <= finishedBatch.get()
// TODO2: Simulate delayed request can be too old by introducing artificial delay
if (SERVER_KNOBS->FASTRESTORE_EXPENSIVE_VALIDATION) {
// Sanity check: All requests before and in finishedBatch must have been processed; otherwise,
// those requests may cause segmentation fault after applier remove the batch data
if (!self->loadingQueue.empty() && self->loadingQueue.top().batchIndex <= self->finishedBatch.get()) {
// Still has pending requests from earlier batchIndex and current batchIndex, which should not
// happen
TraceEvent(SevError, "FastRestoreLoaderSchedulerHasOldLoadFileRequests")
.detail("FinishedBatchIndex", self->finishedBatch.get())
.detail("PendingRequest", self->loadingQueue.top().toString());
}
if (!self->sendingQueue.empty() && self->sendingQueue.top().batchIndex <= self->finishedBatch.get()) {
TraceEvent(SevError, "FastRestoreLoaderSchedulerHasOldSendRequests")
.detail("FinishedBatchIndex", self->finishedBatch.get())
.detail("PendingRequest", self->sendingQueue.top().toString());
}
if (!self->sendLoadParamQueue.empty() &&
self->sendLoadParamQueue.top().batchIndex <= self->finishedBatch.get()) {
TraceEvent(SevError, "FastRestoreLoaderSchedulerHasOldSendLoadParamRequests")
.detail("FinishedBatchIndex", self->finishedBatch.get())
.detail("PendingRequest", self->sendLoadParamQueue.top().toString());
}
}
if (!self->sendingQueue.empty()) {
// Only release one sendMutationRequest at a time because it sends all data for a version batch
// and it takes large amount of resource
const RestoreSendMutationsToAppliersRequest& req = self->sendingQueue.top();
// Dispatch the request if it is the next version batch to process or if cpu usage is low
if (req.batchIndex - 1 == self->finishedSendingVB ||
self->cpuUsage < SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) {
self->addActor.send(handleSendMutationsRequest(req, self));
self->sendingQueue.pop();
}
}
// When shall the node pause the process of other requests, e.g., load file requests
// TODO: Revisit if we should have (self->inflightSendingReqs > 0 && self->inflightLoadingReqs > 0)
if ((self->inflightSendingReqs > 0 && self->inflightLoadingReqs > 0) &&
(self->inflightSendingReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS ||
self->inflightLoadingReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS ||
(self->inflightSendingReqs >= 1 &&
self->cpuUsage >= SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) ||
self->cpuUsage >= SERVER_KNOBS->FASTRESTORE_SCHED_MAX_CPU_PERCENT)) {
if (self->inflightSendingReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS) {
TraceEvent(SevWarn, "FastRestoreLoaderTooManyInflightRequests")
.detail("VersionBatchesBlockedAtSendingMutationsToAppliers", self->inflightSendingReqs)
.detail("CpuUsage", self->cpuUsage)
.detail("InflightSendingReq", self->inflightSendingReqs)
.detail("InflightSendingReqThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SEND_REQS)
.detail("InflightLoadingReq", self->inflightLoadingReqs)
.detail("InflightLoadingReqThreshold", SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_LOAD_REQS);
}
wait(delay(SERVER_KNOBS->FASTRESTORE_SCHED_UPDATE_DELAY));
updateProcessStats(self);
continue;
}
// Dispatch queued requests of sending mutations per loading param
while (!self->sendLoadParamQueue.empty()) { // dispatch current VB first
const RestoreLoaderSchedSendLoadParamRequest& req = self->sendLoadParamQueue.top();
if (req.batchIndex - 1 > self->finishedSendingVB) { // future VB
break;
} else {
req.toSched.send(Void());
self->sendLoadParamQueue.pop();
}
}
sendLoadParams = 0;
curVBInflightReqs = self->inflightSendLoadParamReqs[self->finishedSendingVB + 1];
while (!self->sendLoadParamQueue.empty()) {
const RestoreLoaderSchedSendLoadParamRequest& req = self->sendLoadParamQueue.top();
if (curVBInflightReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_INFLIGHT_SENDPARAM_THRESHOLD ||
sendLoadParams >= SERVER_KNOBS->FASTRESTORE_SCHED_SEND_FUTURE_VB_REQS_BATCH) {
// Too many future VB requests are released
break;
} else {
req.toSched.send(Void());
self->sendLoadParamQueue.pop();
sendLoadParams++;
}
}
// Dispatch loading backup file requests
lastLoadReqs = 0;
while (!self->loadingQueue.empty()) {
if (lastLoadReqs >= SERVER_KNOBS->FASTRESTORE_SCHED_LOAD_REQ_BATCHSIZE) {
break;
}
const RestoreLoadFileRequest& req = self->loadingQueue.top();
if (req.batchIndex <= self->finishedBatch.get()) {
TraceEvent(SevError, "FastRestoreLoaderDispatchRestoreLoadFileRequestTooOld")
.detail("FinishedBatchIndex", self->finishedBatch.get())
.detail("RequestBatchIndex", req.batchIndex);
req.reply.send(RestoreLoadFileReply(req.param, true));
self->loadingQueue.pop();
ASSERT(false); // Check if this ever happens easily
} else {
self->addActor.send(handleLoadFileRequest(req, self));
self->loadingQueue.pop();
lastLoadReqs++;
}
}
if (self->cpuUsage >= SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT) {
wait(delay(SERVER_KNOBS->FASTRESTORE_SCHED_UPDATE_DELAY));
}
updateProcessStats(self);
if (self->loadingQueue.empty() && self->sendingQueue.empty() && self->sendLoadParamQueue.empty()) {
TraceEvent(SevDebug, "FastRestoreLoaderDispatchRequestsWaitOnRequests", self->id())
.detail("HasPendingRequests", self->hasPendingRequests->get());
self->hasPendingRequests->set(false);
wait(self->hasPendingRequests->onChange()); // CAREFUL:Improper req release may cause restore stuck here
}
}
} catch (Error& e) {
if (e.code() != error_code_actor_cancelled) {
TraceEvent(SevError, "FastRestoreLoaderDispatchRequests").error(e, true);
throw e;
}
}
return Void();
}
ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int nodeIndex, Database cx,
RestoreControllerInterface ci) {
state Reference<RestoreLoaderData> self =
Reference<RestoreLoaderData>(new RestoreLoaderData(loaderInterf.id(), nodeIndex, ci));
state ActorCollection actors(false);
state Future<Void> error = actorCollection(self->addActor.getFuture());
state ActorCollection actors(false); // actors whose errors can be ignored
state Future<Void> exitRole = Never();
state bool hasQueuedRequests = false;
actors.add(updateProcessMetrics(self));
actors.add(traceProcessMetrics(self, "RestoreLoader"));
self->addActor.send(dispatchRequests(self));
loop {
state std::string requestTypeStr = "[Init]";
@ -87,12 +242,20 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
}
when(RestoreLoadFileRequest req = waitNext(loaderInterf.loadFile.getFuture())) {
requestTypeStr = "loadFile";
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
self->initBackupContainer(req.param.url);
actors.add(handleLoadFileRequest(req, self));
self->loadingQueue.push(req);
if (!hasQueuedRequests) {
self->hasPendingRequests->set(true);
}
}
when(RestoreSendMutationsToAppliersRequest req = waitNext(loaderInterf.sendMutations.getFuture())) {
requestTypeStr = "sendMutations";
actors.add(handleSendMutationsRequest(req, self));
hasQueuedRequests = !self->loadingQueue.empty() || !self->sendingQueue.empty();
self->sendingQueue.push(req);
if (!hasQueuedRequests) {
self->hasPendingRequests->set(true);
}
}
when(RestoreVersionBatchRequest req = waitNext(loaderInterf.initVersionBatch.getFuture())) {
requestTypeStr = "initVersionBatch";
@ -114,6 +277,7 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
TraceEvent("FastRestoreLoaderCoreExitRole", self->id());
break;
}
when(wait(error)) { TraceEvent("FastRestoreLoaderActorCollectionError", self->id()); }
}
} catch (Error& e) {
TraceEvent(e.code() == error_code_broken_promise ? SevError : SevWarnAlways, "FastRestoreLoaderError",
@ -190,8 +354,9 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
if (rLen != asset.len) throw restore_bad_read();
TraceEvent("FastRestoreLoader")
.detail("DecodingLogFile", asset.filename)
TraceEvent("FastRestoreLoaderDecodingLogFile")
.detail("BatchIndex", asset.batchIndex)
.detail("Filename", asset.filename)
.detail("Offset", asset.offset)
.detail("Length", asset.len);
@ -285,6 +450,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
} catch (Error& e) {
TraceEvent(SevWarn, "FileRestoreCorruptLogFileBlock")
.error(e)
.detail("BatchIndex", asset.batchIndex)
.detail("Filename", file->getFilename())
.detail("BlockOffset", asset.offset)
.detail("BlockLen", asset.len);
@ -306,8 +472,9 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = batchData->kvOpsPerLP.end();
state std::map<LoadingParam, SampledMutationsVec>::iterator samplesIter = batchData->sampleMutations.end();
// Q: How to record the param's fields inside LoadingParam Refer to storageMetrics
TraceEvent("FastRestoreLoaderProcessLoadingParam", loaderID).detail("LoadingParam", param.toString());
TraceEvent("FastRestoreLoaderProcessLoadingParam", loaderID)
.detail("BatchIndex", param.asset.batchIndex)
.detail("LoadingParam", param.toString());
ASSERT(param.blockSize > 0);
ASSERT(param.asset.offset % param.blockSize == 0); // Parse file must be at block boundary.
ASSERT(batchData->kvOpsPerLP.find(param) == batchData->kvOpsPerLP.end());
@ -345,7 +512,9 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
param.asset);
}
TraceEvent("FastRestoreLoaderProcessLoadingParamDone", loaderID).detail("LoadingParam", param.toString());
TraceEvent("FastRestoreLoaderProcessLoadingParamDone", loaderID)
.detail("BatchIndex", param.asset.batchIndex)
.detail("LoadingParam", param.toString());
return Void();
}
@ -356,6 +525,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
state bool isDuplicated = true;
state bool printTrace = false;
ASSERT(batchData.isValid());
ASSERT(req.batchIndex > self->finishedBatch.get());
bool paramExist = batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end();
bool isReady = paramExist ? batchData->processedFileParams[req.param].isReady() : false;
@ -380,6 +550,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
ASSERT(batchData->sampleMutations.find(req.param) == batchData->sampleMutations.end());
batchData->processedFileParams[req.param] =
_processLoadingParam(&self->rangeVersions, req.param, batchData, self->id(), self->bc);
self->inflightLoadingReqs++;
isDuplicated = false;
} else {
TraceEvent(SevFRDebugInfo, "FastRestoreLoadFile", self->id())
@ -424,6 +595,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
}
// Ack restore controller the param is processed
self->inflightLoadingReqs--;
req.reply.send(RestoreLoadFileReply(req.param, isDuplicated));
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
.detail("BatchIndex", req.batchIndex)
@ -436,16 +608,29 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
// Do not need to block on low memory usage because this actor should not increase memory usage.
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
Reference<RestoreLoaderData> self) {
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
state Reference<LoaderBatchStatus> batchStatus = self->status[req.batchIndex];
state Reference<LoaderBatchData> batchData;
state Reference<LoaderBatchStatus> batchStatus;
state bool isDuplicated = true;
if (req.batchIndex <= self->finishedBatch.get()) {
TraceEvent(SevWarn, "FastRestoreLoaderRestoreSendMutationsToAppliersRequestTooOld")
.detail("FinishedBatchIndex", self->finishedBatch.get())
.detail("RequestBatchIndex", req.batchIndex);
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
return Void();
}
batchData = self->batch[req.batchIndex];
batchStatus = self->status[req.batchIndex];
ASSERT(batchData.isValid() && batchStatus.isValid());
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
ASSERT(req.batchIndex > self->finishedBatch.get());
TraceEvent("FastRestoreLoaderPhaseSendMutations", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile)
.detail("LoaderSendStatus", batchStatus->toString());
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
ASSERT(self->finishedBatch.get() < req.batchIndex);
// The VB must finish loading phase before it can send mutations; update finishedLoadingVB for scheduler
self->finishedLoadingVB = std::max(self->finishedLoadingVB, req.batchIndex);
// Ensure each file is sent exactly once by using batchStatus->sendAllLogs and batchStatus->sendAllRanges
if (!req.useRangeFile) {
@ -485,17 +670,20 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
}
if (!isDuplicated) {
self->inflightSendingReqs++;
vector<Future<Void>> fSendMutations;
batchData->rangeToApplier = req.rangeToApplier;
for (auto& [loadParam, kvOps] : batchData->kvOpsPerLP) {
if (loadParam.isRangeFile == req.useRangeFile) {
// Send the parsed mutation to applier who will apply the mutation to DB
fSendMutations.push_back(sendMutationsToApplier(&kvOps, req.batchIndex, loadParam.asset,
loadParam.isRangeFile, &batchData->rangeToApplier,
&self->appliersInterf));
fSendMutations.push_back(
sendMutationsToApplier(&self->sendLoadParamQueue, &self->inflightSendLoadParamReqs,
&self->finishedBatch, &kvOps, req.batchIndex, loadParam.asset,
loadParam.isRangeFile, &batchData->rangeToApplier, &self->appliersInterf));
}
}
wait(waitForAll(fSendMutations));
self->inflightSendingReqs--;
if (req.useRangeFile) {
batchStatus->sendAllRanges = Void(); // Finish sending kvs parsed from range files
} else {
@ -504,6 +692,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
if ((batchStatus->sendAllRanges.present() && batchStatus->sendAllRanges.get().isReady()) &&
(batchStatus->sendAllLogs.present() && batchStatus->sendAllLogs.get().isReady())) {
// Both log and range files have been sent.
self->finishedSendingVB = std::max(self->finishedSendingVB, req.batchIndex);
batchData->kvOpsPerLP.clear();
}
}
@ -534,9 +723,11 @@ void buildApplierRangeMap(KeyRangeMap<UID>* krMap, std::map<Key, UID>* pRangeToA
// isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation;
// pRangeToApplier: range to applierID mapping, deciding which applier is responsible for which range
// pApplierInterfaces: applier interfaces to send the mutations to
ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int batchIndex, RestoreAsset asset,
bool isRangeFile, std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces) {
ACTOR Future<Void> sendMutationsToApplier(
std::priority_queue<RestoreLoaderSchedSendLoadParamRequest>* sendLoadParamQueue,
std::map<int, int>* inflightSendLoadParamReqs, NotifiedVersion* finishedBatch, VersionedMutationsMap* pkvOps,
int batchIndex, RestoreAsset asset, bool isRangeFile, std::map<Key, UID>* pRangeToApplier,
std::map<UID, RestoreApplierInterface>* pApplierInterfaces) {
state VersionedMutationsMap& kvOps = *pkvOps;
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
@ -545,6 +736,20 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
state std::vector<UID> applierIDs = getApplierIDs(*pRangeToApplier);
state double msgSize = 0; // size of mutations in the message
// Wait for scheduler to kick it off
Promise<Void> toSched;
sendLoadParamQueue->push(RestoreLoaderSchedSendLoadParamRequest(batchIndex, toSched, now()));
wait(toSched.getFuture());
if (finishedBatch->get() >= batchIndex) {
TraceEvent(SevError, "FastRestoreLoaderSendMutationToApplierLateRequest")
.detail("FinishedBatchIndex", finishedBatch->get())
.detail("RequestBatchIndex", batchIndex);
ASSERT(false);
return Void();
}
(*inflightSendLoadParamReqs)[batchIndex]++;
TraceEvent("FastRestoreLoaderSendMutationToApplier")
.detail("IsRangeFile", isRangeFile)
.detail("EndVersion", asset.endVersion)
@ -641,7 +846,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
applierID, RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierVersionedMutationsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
TraceEvent(SevInfo, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
@ -665,7 +870,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierVersionedMutationsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
TraceEvent(SevInfo, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
@ -674,11 +879,22 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
}
wait(waitForAll(fSends));
kvOps = VersionedMutationsMap(); // Free memory for parsed mutations at the restore asset.
TraceEvent("FastRestoreLoaderSendMutationToAppliers")
.detail("BatchIndex", batchIndex)
.detail("RestoreAsset", asset.toString())
.detail("Mutations", kvCount);
(*inflightSendLoadParamReqs)[batchIndex]--;
if (finishedBatch->get() < batchIndex) {
kvOps = VersionedMutationsMap(); // Free memory for parsed mutations at the restore asset.
TraceEvent("FastRestoreLoaderSendMutationToApplierDone")
.detail("BatchIndex", batchIndex)
.detail("RestoreAsset", asset.toString())
.detail("Mutations", kvCount);
} else {
TraceEvent(SevWarnAlways, "FastRestoreLoaderSendMutationToApplierDoneTooLate")
.detail("BatchIndex", batchIndex)
.detail("FinishedBatchIndex", finishedBatch->get())
.detail("RestoreAsset", asset.toString())
.detail("Mutations", kvCount);
}
return Void();
}
@ -779,6 +995,7 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
SerializedMutationListMap& mutationMap = *pmutationMap;
TraceEvent(SevFRMutationInfo, "FastRestoreLoaderParseSerializedLogMutation")
.detail("BatchIndex", asset.batchIndex)
.detail("RestoreAsset", asset.toString());
Arena tempArena;
@ -881,6 +1098,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
state SampledMutationsVec& sampleMutations = samplesIter->second;
TraceEvent(SevFRDebugInfo, "FastRestoreDecodedRangeFile")
.detail("BatchIndex", asset.batchIndex)
.detail("Filename", asset.filename)
.detail("Version", version)
.detail("BeginVersion", asset.beginVersion)
@ -895,8 +1113,9 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
try {
Standalone<VectorRef<KeyValueRef>> kvs =
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
TraceEvent("FastRestoreLoader")
.detail("DecodedRangeFile", asset.filename)
TraceEvent("FastRestoreLoaderDecodedRangeFile")
.detail("BatchIndex", asset.batchIndex)
.detail("Filename", asset.filename)
.detail("DataSize", kvs.contents().size());
blockData = kvs;
} catch (Error& e) {
@ -951,6 +1170,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// We cache all kv operations into kvOps, and apply all kv operations later in one place
auto it = kvOps.insert(std::make_pair(msgVersion, MutationsVec()));
TraceEvent(SevFRMutationInfo, "FastRestoreDecodeRangeFile")
.detail("BatchIndex", asset.batchIndex)
.detail("CommitVersion", version)
.detail("ParsedMutationKV", m.toString());
@ -980,6 +1200,7 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
state Standalone<VectorRef<KeyValueRef>> data =
wait(parallelFileRestore::decodeLogFileBlock(inFile, asset.offset, asset.len));
TraceEvent("FastRestoreLoaderDecodeLogFile")
.detail("BatchIndex", asset.batchIndex)
.detail("RestoreAsset", asset.toString())
.detail("DataSize", data.contents().size());
@ -1017,6 +1238,25 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
.detail("RequestedBatchIndex", req.batchIndex);
wait(self->finishedBatch.whenAtLeast(req.batchIndex - 1));
if (self->finishedBatch.get() == req.batchIndex - 1) {
// Sanity check: All requests before and in this batchIndex must have been processed; otherwise,
// those requests may cause segmentation fault after applier remove the batch data
while (!self->loadingQueue.empty() && self->loadingQueue.top().batchIndex <= req.batchIndex) {
// Still has pending requests from earlier batchIndex and current batchIndex, which should not happen
TraceEvent(SevWarn, "FastRestoreLoaderHasPendingLoadFileRequests")
.detail("PendingRequest", self->loadingQueue.top().toString());
self->loadingQueue.pop();
}
while (!self->sendingQueue.empty() && self->sendingQueue.top().batchIndex <= req.batchIndex) {
TraceEvent(SevWarn, "FastRestoreLoaderHasPendingSendRequests")
.detail("PendingRequest", self->sendingQueue.top().toString());
self->sendingQueue.pop();
}
while (!self->sendLoadParamQueue.empty() && self->sendLoadParamQueue.top().batchIndex <= req.batchIndex) {
TraceEvent(SevWarn, "FastRestoreLoaderHasPendingSendLoadParamRequests")
.detail("PendingRequest", self->sendLoadParamQueue.top().toString());
self->sendLoadParamQueue.pop();
}
self->finishedBatch.set(req.batchIndex);
// Clean up batchData
self->batch.erase(req.batchIndex);

View File

@ -93,7 +93,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
oldLogMutations("OldLogMutations", cc) {}
} counters;
explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex), vbState(LoaderVersionBatchState::NOT_INIT) {
explicit LoaderBatchData(UID nodeID, int batchIndex)
: counters(this, nodeID, batchIndex), vbState(LoaderVersionBatchState::NOT_INIT), loadFileReqs(0) {
pollMetrics = traceCounters(format("FastRestoreLoaderMetrics%d", batchIndex), nodeID,
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex));
@ -118,7 +119,7 @@ struct LoaderBatchStatus : public ReferenceCounted<LoaderBatchStatus> {
void addref() { return ReferenceCounted<LoaderBatchStatus>::addref(); }
void delref() { return ReferenceCounted<LoaderBatchStatus>::delref(); }
std::string toString() {
std::string toString() const {
std::stringstream ss;
ss << "sendAllRanges: "
<< (!sendAllRanges.present() ? "invalid" : (sendAllRanges.get().isReady() ? "ready" : "notReady"))
@ -128,6 +129,29 @@ struct LoaderBatchStatus : public ReferenceCounted<LoaderBatchStatus> {
}
};
// Each request for each loadingParam, so that scheduler can control which requests in which version batch to send first
struct RestoreLoaderSchedSendLoadParamRequest {
int batchIndex;
Promise<Void> toSched;
double start;
explicit RestoreLoaderSchedSendLoadParamRequest(int batchIndex, Promise<Void> toSched, double start)
: batchIndex(batchIndex), toSched(toSched), start(start){};
RestoreLoaderSchedSendLoadParamRequest() = default;
bool operator<(RestoreLoaderSchedSendLoadParamRequest const& rhs) const {
return batchIndex > rhs.batchIndex || (batchIndex == rhs.batchIndex && start > rhs.start);
}
std::string toString() const {
std::stringstream ss;
ss << "RestoreLoaderSchedSendLoadParamRequest: "
<< " batchIndex:" << batchIndex << " toSchedFutureIsReady:" << toSched.getFuture().isReady()
<< " start:" << start;
return ss.str();
}
};
struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoaderData> {
// buffered data per version batch
std::map<int, Reference<LoaderBatchData>> batch;
@ -139,13 +163,32 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
Reference<IBackupContainer> bc; // Backup container is used to read backup files
Key bcUrl; // The url used to get the bc
// Request scheduler
std::priority_queue<RestoreLoadFileRequest> loadingQueue; // request queue of loading files
std::priority_queue<RestoreSendMutationsToAppliersRequest>
sendingQueue; // request queue of sending mutations to appliers
std::priority_queue<RestoreLoaderSchedSendLoadParamRequest> sendLoadParamQueue;
int finishedLoadingVB; // the max version batch index that finished loading file phase
int finishedSendingVB; // the max version batch index that finished sending mutations phase
int inflightSendingReqs; // number of sendingMutations requests released
int inflightLoadingReqs; // number of load backup file requests released
std::map<int, int> inflightSendLoadParamReqs; // key: batchIndex, value: inflightSendLoadParamReqs
Reference<AsyncVar<bool>> hasPendingRequests; // are there pending requests for loader
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
// addActor is used to create the actorCollection when the RestoreController is created
PromiseStream<Future<Void>> addActor;
void addref() { return ReferenceCounted<RestoreLoaderData>::addref(); }
void delref() { return ReferenceCounted<RestoreLoaderData>::delref(); }
explicit RestoreLoaderData(UID loaderInterfID, int assignedIndex, RestoreControllerInterface ci) : ci(ci) {
explicit RestoreLoaderData(UID loaderInterfID, int assignedIndex, RestoreControllerInterface ci)
: ci(ci), finishedLoadingVB(0), finishedSendingVB(0), inflightSendingReqs(0), inflightLoadingReqs(0) {
nodeID = loaderInterfID;
nodeIndex = assignedIndex;
role = RestoreRole::Loader;
hasPendingRequests = Reference<AsyncVar<bool>>(new AsyncVar<bool>(false));
}
~RestoreLoaderData() = default;

View File

@ -81,7 +81,17 @@ void updateProcessStats(Reference<RestoreRoleData> self) {
if (g_network->isSimulated()) {
// memUsage and cpuUsage are not relevant in the simulator,
// and relying on the actual values could break seed determinism
self->cpuUsage = 100.0;
if (deterministicRandom()->random01() < 0.2) { // not fully utilized cpu
self->cpuUsage = deterministicRandom()->random01() * SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT;
} else if (deterministicRandom()->random01() < 0.6) { // achieved target cpu but cpu is not busy
self->cpuUsage = SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT +
deterministicRandom()->random01() * (SERVER_KNOBS->FASTRESTORE_SCHED_MAX_CPU_PERCENT -
SERVER_KNOBS->FASTRESTORE_SCHED_TARGET_CPU_PERCENT);
} else { // reach desired max cpu usage; use max cpu as 200 to simulate incorrect cpu profiling
self->cpuUsage =
SERVER_KNOBS->FASTRESTORE_SCHED_MAX_CPU_PERCENT +
deterministicRandom()->random01() * (200 - SERVER_KNOBS->FASTRESTORE_SCHED_MAX_CPU_PERCENT);
}
self->memory = 100.0;
self->residentMemory = 100.0;
return;
@ -109,7 +119,12 @@ ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatch
if (g_network->isSimulated() && BUGGIFY) {
// Intentionally randomly block actors for low memory reason.
// memory will be larger than threshold when deterministicRandom()->random01() > 1/2
memory = SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT * 2 * deterministicRandom()->random01();
if (deterministicRandom()->random01() < 0.4) { // enough memory
memory = SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT * deterministicRandom()->random01();
} else { // used too much memory, needs throttling
memory = SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT +
deterministicRandom()->random01() * SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT;
}
}
if (memory < memoryThresholdBytes || self->finishedBatch.get() + 1 == actorBatchIndex) {
if (memory >= memoryThresholdBytes) {

View File

@ -124,6 +124,7 @@ public:
virtual std::string describeNode() = 0;
};
void updateProcessStats(Reference<RestoreRoleData> self);
ACTOR Future<Void> updateProcessMetrics(Reference<RestoreRoleData> self);
ACTOR Future<Void> traceProcessMetrics(Reference<RestoreRoleData> self, std::string role);
ACTOR Future<Void> traceRoleVersionBatchProgress(Reference<RestoreRoleData> self, std::string role);

View File

@ -75,8 +75,9 @@ ACTOR Future<Void> handlerTerminateWorkerRequest(RestoreSimpleRequest req, Refer
// Future: Multiple roles in a restore worker
void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWorkerData> self,
ActorCollection* actors, Database cx) {
// Already recruited a role
// Future: Allow multiple restore roles on a restore worker. The design should easily allow this.
ASSERT(!self->loaderInterf.present() || !self->applierInterf.present()); // Only one role per worker for now
// Already recruited a role
if (self->loaderInterf.present()) {
ASSERT(req.role == RestoreRole::Loader);
req.reply.send(RestoreRecruitRoleReply(self->id(), RestoreRole::Loader, self->loaderInterf.get()));

View File

@ -2575,6 +2575,11 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.recoveryFinished );
DUMPTOKEN( recruited.disablePopRequest );
DUMPTOKEN( recruited.enablePopRequest );
DUMPTOKEN( recruited.snapRequest );
ProtocolVersion protocolVersion = BinaryReader::fromStringRef<ProtocolVersion>( fProtocolVersions.get()[idx].value, Unversioned() );
TLogSpillType logSpillType = BinaryReader::fromStringRef<TLogSpillType>( fTLogSpillTypes.get()[idx].value, AssumeVersion(protocolVersion) );
@ -2783,6 +2788,11 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.recoveryFinished );
DUMPTOKEN( recruited.disablePopRequest );
DUMPTOKEN( recruited.enablePopRequest );
DUMPTOKEN( recruited.snapRequest );
stopAllTLogs(self, recruited.id());

View File

@ -498,6 +498,34 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
ACTOR static Future<Void> pushResetChecker( Reference<ConnectionResetInfo> self, NetworkAddress addr ) {
self->slowReplies = 0;
self->fastReplies = 0;
wait(delay(SERVER_KNOBS->PUSH_STATS_INTERVAL));
TraceEvent("SlowPushStats").detail("PeerAddress", addr).detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies);
if(self->slowReplies >= SERVER_KNOBS->PUSH_STATS_SLOW_AMOUNT && self->slowReplies/double(self->slowReplies+self->fastReplies) >= SERVER_KNOBS->PUSH_STATS_SLOW_RATIO) {
FlowTransport::transport().resetConnection(addr);
self->lastReset = now();
}
return Void();
}
ACTOR static Future<TLogCommitReply> recordPushMetrics( Reference<ConnectionResetInfo> self, NetworkAddress addr, Future<TLogCommitReply> in ) {
state double startTime = now();
TLogCommitReply t = wait(in);
if(now()-self->lastReset > SERVER_KNOBS->PUSH_RESET_INTERVAL) {
if(now()-startTime > SERVER_KNOBS->PUSH_MAX_LATENCY) {
if(self->resetCheck.isReady()) {
self->resetCheck = pushResetChecker(self, addr);
}
self->slowReplies++;
} else {
self->fastReplies++;
}
}
return t;
}
Future<Version> push(Version prevVersion, Version version, Version knownCommittedVersion,
Version minKnownCommittedVersion, LogPushData& data, Optional<UID> debugID) final {
// FIXME: Randomize request order as in LegacyLogSystem?
@ -506,10 +534,15 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
int location = 0;
for(auto& it : tLogs) {
if(it->isLocal && it->logServers.size()) {
if(it->connectionResetTrackers.size() == 0) {
for(int i = 0; i < it->logServers.size(); i++) {
it->connectionResetTrackers.push_back(Reference<ConnectionResetInfo>( new ConnectionResetInfo() ));
}
}
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) );
allReplies.push_back( recordPushMetrics( it->connectionResetTrackers[loc], it->logServers[loc]->get().interf().address(), it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);

View File

@ -6358,11 +6358,15 @@ ACTOR Future<Void> verify(VersionedBTree* btree, FutureStream<Version> vStream,
committedVersions.pop_front();
}
// Choose a random committed version, or sometimes the latest (which could be ahead of the latest version
// from vStream)
v = (committedVersions.empty() || deterministicRandom()->random01() < 0.25)
? btree->getLastCommittedVersion()
: committedVersions[deterministicRandom()->randomInt(0, committedVersions.size())];
// Continue if the versions list is empty, which won't wait until it reaches the oldest readable
// btree version which will already be in vStream.
if(committedVersions.empty()) {
continue;
}
// Choose a random committed version.
v = committedVersions[deterministicRandom()->randomInt(0, committedVersions.size())];
debug_printf("Using committed version %" PRId64 "\n", v);
// Get a cursor at v so that v doesn't get expired between the possibly serial steps below.
state Reference<IStoreCursor> cur = btree->readAtVersion(v);
@ -7244,6 +7248,7 @@ TEST_CASE("!/redwood/correctness/btree") {
pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0);
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8);
state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50);
state int maxVerificationMapEntries = 300e3;
printf("\n");
printf("targetPageOps: %" PRId64 "\n", targetPageOps);
@ -7262,6 +7267,7 @@ TEST_CASE("!/redwood/correctness/btree") {
printf("cacheSizeBytes: %s\n", cacheSizeBytes == 0 ? "default" : format("%" PRId64, cacheSizeBytes).c_str());
printf("versionIncrement: %" PRId64 "\n", versionIncrement);
printf("remapCleanupWindow: %" PRId64 "\n", remapCleanupWindow);
printf("maxVerificationMapEntries: %d\n", maxVerificationMapEntries);
printf("\n");
printf("Deleting existing test data...\n");
@ -7299,7 +7305,7 @@ TEST_CASE("!/redwood/correctness/btree") {
state Future<Void> commit = Void();
state int64_t totalPageOps = 0;
while (totalPageOps < targetPageOps) {
while (totalPageOps < targetPageOps && written.size() < maxVerificationMapEntries) {
// Sometimes increment the version
if (deterministicRandom()->random01() < 0.10) {
++version;
@ -7394,8 +7400,8 @@ TEST_CASE("!/redwood/correctness/btree") {
keys.insert(kv.key);
}
// Commit at end or after this commit's mutation bytes are reached
if (totalPageOps >= targetPageOps || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
// Commit after any limits for this commit or the total test are reached
if (totalPageOps >= targetPageOps || written.size() >= maxVerificationMapEntries || mutationBytesThisCommit >= mutationBytesTargetThisCommit) {
// Wait for previous commit to finish
wait(commit);
printf("Committed. Next commit %d bytes, %" PRId64 " bytes.", mutationBytesThisCommit, mutationBytes.get());
@ -7416,7 +7422,9 @@ TEST_CASE("!/redwood/correctness/btree") {
commit = map(btree->commit(), [=,&ops=totalPageOps](Void) {
// Update pager ops before clearing metrics
ops += g_redwoodMetrics.pageOps();
printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%)\n", ops, targetPageOps, ops * 100.0 / targetPageOps);
printf("PageOps %" PRId64 "/%" PRId64 " (%.2f%%) VerificationMapEntries %d/%d (%.2f%%)\n",
ops, targetPageOps, ops * 100.0 / targetPageOps,
written.size(), maxVerificationMapEntries, written.size() * 100.0 / maxVerificationMapEntries);
printf("Committed:\n%s\n", g_redwoodMetrics.toString(true).c_str());
// Notify the background verifier that version is committed and therefore readable

View File

@ -46,7 +46,8 @@ ACTOR Future<Void> waitFailureClient(RequestStream<ReplyPromise<Void>> waitFailu
if (!x.present()) {
if (trace) {
TraceEvent("WaitFailureClient")
.detail("FailedEndpoint", waitFailure.getEndpoint().getPrimaryAddress().toString());
.detail("FailedEndpoint", waitFailure.getEndpoint().getPrimaryAddress().toString())
.detail("Token", waitFailure.getEndpoint().token);
}
return Void();
}

View File

@ -1416,6 +1416,11 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN( recruited.lock );
DUMPTOKEN( recruited.getQueuingMetrics );
DUMPTOKEN( recruited.confirmRunning );
DUMPTOKEN( recruited.waitFailure );
DUMPTOKEN( recruited.recoveryFinished );
DUMPTOKEN( recruited.disablePopRequest );
DUMPTOKEN( recruited.enablePopRequest );
DUMPTOKEN( recruited.snapRequest );
errorForwarders.add( zombie(recruited, forwardError( errors, Role::LOG_ROUTER, recruited.id(),
logRouter( recruited, req, dbInfo ) ) ) );

View File

@ -92,6 +92,9 @@ public:
//The API being used by this client
TransactionType transactionType;
// Maximum time to reset DB to the original state
double resetDBTimeout;
ApiCorrectnessWorkload(WorkloadContext const& wcx) : ApiWorkload(wcx), numRandomOperations("Num Random Operations") {
numGets = getOption(options, LiteralStringRef("numGets"), 1000);
numGetRanges = getOption(options, LiteralStringRef("numGetRanges"), 100);
@ -107,6 +110,8 @@ public:
int maxTransactionBytes = getOption(options, LiteralStringRef("maxTransactionBytes"), 500000);
maxKeysPerTransaction = std::max(1, maxTransactionBytes / (maxValueLength + maxLongKeyLength));
resetDBTimeout = getOption(options, LiteralStringRef("resetDBTimeout"), 1800.0);
if(maxTransactionBytes > 500000) {
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
@ -147,9 +152,9 @@ public:
wait(timeout(self->runScriptedTest(self, data), 600, Void()));
if(!self->hasFailed()) {
//Return database to original state (for a maximum of 1800 seconds)
// Return database to original state (for a maximum of resetDBTimeout seconds)
try {
wait(timeoutError(::success(self->runSet(data, self)), 1800));
wait(timeoutError(::success(self->runSet(data, self)), self->resetDBTimeout));
}
catch(Error &e) {
if(e.code() == error_code_timed_out) {

View File

@ -93,7 +93,9 @@ Future<Void> ApiWorkload::start(Database const& cx) {
void ApiWorkload::testFailure(std::string reason)
{
printf("test failure on client %d: %s\n", clientPrefixInt, reason.c_str());
TraceEvent(SevError, "TestFailure").detail("Reason", description() + reason).detail("Workload", "ApiCorrectness");
TraceEvent(SevError, "TestFailure")
.detail("Reason", description() + " " + reason)
.detail("Workload", "ApiCorrectness");
success = false;
}

View File

@ -0,0 +1,73 @@
/*
* LockDatabaseFrequently.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct LockDatabaseFrequentlyWorkload : TestWorkload {
double delayBetweenLocks;
double testDuration;
PerfIntCounter lockCount{ "LockCount" };
LockDatabaseFrequentlyWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
delayBetweenLocks = getOption(options, LiteralStringRef("delayBetweenLocks"), 0.1);
testDuration = getOption(options, LiteralStringRef("testDuration"), 60);
}
std::string description() override { return "LockDatabaseFrequently"; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return clientId == 0 ? worker(this, cx) : Void(); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(vector<PerfMetric>& m) override {
if (clientId == 0) {
m.push_back(lockCount.getMetric());
}
}
ACTOR static Future<Void> worker(LockDatabaseFrequentlyWorkload* self, Database cx) {
state Future<Void> end = delay(self->testDuration);
state double lastLock = g_network->now();
state double lastUnlock = g_network->now() + self->delayBetweenLocks / 2;
loop {
wait(lockAndUnlock(self, cx, &lastLock, &lastUnlock));
++self->lockCount;
if (end.isReady()) {
return Void();
}
}
}
ACTOR static Future<Void> lockAndUnlock(LockDatabaseFrequentlyWorkload* self, Database cx, double* lastLock,
double* lastUnlock) {
state UID uid = deterministicRandom()->randomUniqueID();
wait(lockDatabase(cx, uid) && poisson(lastLock, self->delayBetweenLocks));
wait(unlockDatabase(cx, uid) && poisson(lastUnlock, self->delayBetweenLocks));
return Void();
}
};
WorkloadFactory<LockDatabaseFrequentlyWorkload> LockDatabaseFrequentlyWorkloadFactory("LockDatabaseFrequently");

View File

@ -27,6 +27,10 @@
#include "fdbserver/ServerDBInfo.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Choose a random proxy and a random tLog, represented as unclogTlog.
// The workload first clogs network link between the chosen proxy and all tLogs but the unclogTlog;
// While the network is still clogged, the workload kills the proxy and clogs the unclogged tlog's interface.
// Note: The clogged network link's latency will become "clogDuration".
struct RollbackWorkload : TestWorkload {
bool enableFailures, multiple, enabled;
double meanDelay, clogDuration, testDuration;
@ -80,19 +84,21 @@ struct RollbackWorkload : TestWorkload {
.detail("Proxy", proxy.address())
.detail("UncloggedTLog", uncloggedTLog);
for(int t=0; t<tlogs.size(); t++)
if (t != utIndex)
for (int t = 0; t < tlogs.size(); t++) {
if (t != utIndex) {
g_simulator.clogPair(
proxy.address().ip,
tlogs[t].address().ip,
self->clogDuration );
//g_simulator.clogInterface( g_simulator.getProcess( system.tlogs[t].commit.getEndpoint() ), self->clogDuration, ClogAll );
}
}
// While the clogged machines are still clogged...
wait( delay( self->clogDuration/3 ) );
system = self->dbInfo->get();
// Kill the proxy and the unclogged tlog
// Kill the proxy and clog the unclogged tlog
if (self->enableFailures) {
g_simulator.killProcess( g_simulator.getProcessByAddress( proxy.address() ), ISimulator::KillInstantly );
g_simulator.clogInterface( uncloggedTLog.ip, self->clogDuration, ClogAll );

View File

@ -347,7 +347,7 @@ struct VersionStampWorkload : TestWorkload {
}
catch (Error &e) {
err = e;
if (err.code() == error_code_database_locked) {
if (err.code() == error_code_database_locked && g_simulator.extraDB != nullptr) {
//TraceEvent("VST_CommitDatabaseLocked");
cx_is_primary = !cx_is_primary;
tr = ReadYourWritesTransaction(cx_is_primary ? cx : extraDB);

View File

@ -682,7 +682,7 @@ inline bool operator==(const StringRef& lhs, const StringRef& rhs) {
if (lhs.size() == 0 && rhs.size() == 0) {
return true;
}
return lhs.size() == rhs.size() && !memcmp(lhs.begin(), rhs.begin(), lhs.size());
return lhs.size() == rhs.size() && memcmp(lhs.begin(), rhs.begin(), lhs.size()) == 0;
}
inline bool operator<(const StringRef& lhs, const StringRef& rhs) {
if (std::min(lhs.size(), rhs.size()) > 0) {

View File

@ -76,6 +76,7 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( MAX_RECONNECTION_TIME, 0.5 );
init( RECONNECTION_TIME_GROWTH_RATE, 1.2 );
init( RECONNECTION_RESET_TIME, 5.0 );
init( ALWAYS_ACCEPT_DELAY, 15.0 );
init( ACCEPT_BATCH_SIZE, 10 );
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
@ -210,10 +211,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( FUTURE_VERSION_BACKOFF_GROWTH, 2.0 );
init( LOAD_BALANCE_MAX_BAD_OPTIONS, 1 ); //should be the same as MAX_MACHINES_FALLING_BEHIND
init( LOAD_BALANCE_PENALTY_IS_BAD, true );
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 2.0 );
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.05 );
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 10.0 ); //should be longer than the rate we log network metrics
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.10 );
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
init( BASIC_LOAD_BALANCE_BUCKETS, 40 );
init( BASIC_LOAD_BALANCE_MIN_AMOUNT, 50000 ); //Will not update probabilities if the average proxy busyness is less than 5%
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -93,6 +93,7 @@ public:
double MAX_RECONNECTION_TIME;
double RECONNECTION_TIME_GROWTH_RATE;
double RECONNECTION_RESET_TIME;
double ALWAYS_ACCEPT_DELAY;
int ACCEPT_BATCH_SIZE;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
@ -232,7 +233,7 @@ public:
double BASIC_LOAD_BALANCE_UPDATE_RATE;
double BASIC_LOAD_BALANCE_MAX_CHANGE;
double BASIC_LOAD_BALANCE_MAX_PROB;
int BASIC_LOAD_BALANCE_BUCKETS;
double BASIC_LOAD_BALANCE_MIN_AMOUNT;
// Health Monitor
int FAILURE_DETECTION_DELAY;

View File

@ -166,6 +166,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
n.detail(format("PriorityBusy%d", itr.first).c_str(), itr.second);
}
bool firstTracker = true;
for (auto &itr : g_network->networkInfo.metrics.starvationTrackers) {
if(itr.active) {
itr.duration += now() - itr.windowedTimer;
@ -176,6 +177,11 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
n.detail(format("PriorityStarvedBelow%d", itr.priority).c_str(), std::min(currentStats.elapsed, itr.duration));
n.detail(format("PriorityMaxStarvedBelow%d", itr.priority).c_str(), itr.maxDuration);
if(firstTracker) {
g_network->networkInfo.metrics.lastRunLoopBusyness = std::min(currentStats.elapsed, itr.duration)/currentStats.elapsed;
firstTracker = false;
}
itr.duration = 0;
itr.maxDuration = 0;
}

View File

@ -342,6 +342,7 @@ struct NetworkMetrics {
};
std::unordered_map<TaskPriority, struct PriorityStats> activeTrackers;
double lastRunLoopBusyness;
std::vector<struct PriorityStats> starvationTrackers;
static const std::vector<int> starvationBins;

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{409BDCD0-ECF7-4CCA-A3F9-EEEAF0C79A42}'
Id='{707FC06F-9954-4A7E-AC9C-A52C99AE776D}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

View File

@ -40,3 +40,6 @@ testTitle = 'Clogged'
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test.workload]]
testName = 'LockDatabaseFrequently'

View File

@ -4,3 +4,6 @@ testTitle = 'VersionStamp'
[[test.workload]]
testName = 'VersionStamp'
soleOwnerOfMetadataVersionKey = true
[[test.workload]]
testName = 'LockDatabaseFrequently'

View File

@ -25,6 +25,7 @@ runSetup = true
numClearRanges = 10
maxTransactionBytes = 500000
randomTestDuration = 60
resetDBTimeout = 7200
[[test.workload]]
testName = 'AtomicRestore'

View File

@ -10,8 +10,8 @@ timeout = 360000
[[test.workload]]
testName = 'Cycle'
# nodeCount=30000
nodeCount = 1000
nodeCount=30000
# nodeCount = 1000
# transactionsPerSecond=500.0
transactionsPerSecond = 2500.0
testDuration = 30.0