Merge branch 'master' into mengxu/fr-code-improvement-PR
This commit is contained in:
@ -504,4 +504,16 @@ Armon Dadgar (ART)
Copyright (C) 2009 The Guava Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.
@ -56,6 +56,7 @@ set(JAVA_BINDING_SRCS
@ -36,6 +36,11 @@ static JavaVM* g_jvm = nullptr;
static thread_local JNIEnv* g_thread_jenv = nullptr; // Defined for the network thread once it is running, and for any thread that has called registerCallback
static thread_local jmethodID g_IFutureCallback_call_methodID = JNI_NULL;
static thread_local bool is_external = false;
static jclass range_result_summary_class;
static jclass range_result_class;
static jclass string_class;
static jmethodID range_result_init;
static jmethodID range_result_summary_init;
void detachIfExternalThread(void *ignore) {
if(is_external && g_thread_jenv != nullptr) {
@ -275,10 +280,9 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureStrings_FutureString
return JNI_NULL;
jclass str_clazz = jenv->FindClass("java/lang/String");
if( jenv->ExceptionOccurred() )
return JNI_NULL;
jobjectArray arr = jenv->NewObjectArray(count, str_clazz, JNI_NULL);
jobjectArray arr = jenv->NewObjectArray(count, string_class, JNI_NULL);
if( !arr ) {
if( !jenv->ExceptionOccurred() )
@ -306,13 +310,6 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResult
return JNI_NULL;
jclass resultCls = jenv->FindClass("com/apple/foundationdb/RangeResultSummary");
if( jenv->ExceptionOccurred() )
return JNI_NULL;
jmethodID resultCtorId = jenv->GetMethodID(resultCls, "<init>", "([BIZ)V");
if( jenv->ExceptionOccurred() )
return JNI_NULL;
FDBFuture *f = (FDBFuture *)future;
@ -337,7 +334,7 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResult
jenv->SetByteArrayRegion(lastKey, 0, kvs[count - 1].key_length, (jbyte *)kvs[count - 1].key);
jobject result = jenv->NewObject(resultCls, resultCtorId, lastKey, count, (jboolean)more);
jobject result = jenv->NewObject(range_result_summary_class, range_result_summary_init, lastKey, count, (jboolean)more);
if( jenv->ExceptionOccurred() )
return JNI_NULL;
@ -350,9 +347,6 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResult
return JNI_NULL;
jclass resultCls = jenv->FindClass("com/apple/foundationdb/RangeResult");
jmethodID resultCtorId = jenv->GetMethodID(resultCls, "<init>", "([B[IZ)V");
FDBFuture *f = (FDBFuture *)future;
@ -414,7 +408,7 @@ JNIEXPORT jobject JNICALL Java_com_apple_foundationdb_FutureResults_FutureResult
jenv->ReleaseByteArrayElements(keyValueArray, (jbyte *)keyvalues_barr, 0);
jenv->ReleaseIntArrayElements(lengthArray, length_barr, 0);
jobject result = jenv->NewObject(resultCls, resultCtorId, keyValueArray, lengthArray, (jboolean)more);
jobject result = jenv->NewObject(range_result_class, range_result_init, keyValueArray, lengthArray, (jboolean)more);
if( jenv->ExceptionOccurred() )
return JNI_NULL;
@ -1042,8 +1036,43 @@ JNIEXPORT void JNICALL Java_com_apple_foundationdb_FDB_Network_1stop(JNIEnv *jen
jint JNI_OnLoad(JavaVM *vm, void *reserved) {
JNIEnv *env;
g_jvm = vm;
return JNI_VERSION_1_1;
if (vm->GetEnv((void**)&env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
jclass local_range_result_class = env->FindClass("com/apple/foundationdb/RangeResult");
range_result_init = env->GetMethodID(local_range_result_class, "<init>", "([B[IZ)V");
range_result_class = (jclass) (env)->NewGlobalRef(local_range_result_class);
jclass local_range_result_summary_class = env->FindClass("com/apple/foundationdb/RangeResultSummary");
range_result_summary_init = env->GetMethodID(local_range_result_summary_class, "<init>", "([BIZ)V");
range_result_summary_class = (jclass) (env)->NewGlobalRef(local_range_result_summary_class);
jclass local_string_class = env->FindClass("java/lang/String");
string_class = (jclass) (env)->NewGlobalRef(local_string_class);
return JNI_VERSION_1_6;
// Is automatically called once the Classloader is destroyed
void JNI_OnUnload(JavaVM *vm, void *reserved) {
JNIEnv* env;
if (vm->GetEnv((void**)&env, JNI_VERSION_1_6) != JNI_OK) {
} else {
// delete global references so the GC can collect them
if (range_result_summary_class != NULL) {
if (range_result_class != NULL) {
if (string_class != NULL) {
#ifdef __cplusplus
@ -304,4 +304,58 @@ public class ArrayUtilTests {
fail("Not yet implemented");
private static final int SAMPLE_COUNT = 1000000;
private static final int SAMPLE_MAX_SIZE = 2048;
private List<byte[]> unsafe;
private List<byte[]> java;
public void init() {
unsafe = new ArrayList(SAMPLE_COUNT);
java = new ArrayList(SAMPLE_COUNT);
Random random = new Random();
for (int i = 0; i <= SAMPLE_COUNT; i++) {
byte[] addition = new byte[random.nextInt(SAMPLE_MAX_SIZE)];
public void testComparatorSort() {
Collections.sort(unsafe, FastByteComparisons.lexicographicalComparerUnsafeImpl());
Collections.sort(java, FastByteComparisons.lexicographicalComparerJavaImpl());
public void testUnsafeComparison() {
for (int i =0; i< SAMPLE_COUNT; i++) {
Assert.assertEquals(FastByteComparisons.lexicographicalComparerUnsafeImpl().compare(unsafe.get(i), java.get(i)), 0);
public void testJavaComparison() {
for (int i =0; i< SAMPLE_COUNT; i++) {
Assert.assertEquals(FastByteComparisons.lexicographicalComparerJavaImpl().compare(unsafe.get(i), java.get(i)), 0);
public void testUnsafeComparisonWithOffet() {
for (int i =0; i< SAMPLE_COUNT; i++) {
if (unsafe.get(i).length > 5)
Assert.assertEquals(FastByteComparisons.lexicographicalComparerUnsafeImpl().compareTo(unsafe.get(i), 4, unsafe.get(i).length - 4, java.get(i), 4, java.get(i).length - 4), 0);
public void testJavaComparisonWithOffset() {
for (int i =0; i< SAMPLE_COUNT; i++) {
if (unsafe.get(i).length > 5)
Assert.assertEquals(FastByteComparisons.lexicographicalComparerJavaImpl().compareTo(unsafe.get(i), 4, unsafe.get(i).length - 4, java.get(i), 4, java.get(i).length - 4), 0);
@ -34,7 +34,7 @@ import;
* {@link #printable(byte[])} for debugging non-text keys and values.
public class ByteArrayUtil {
public class ByteArrayUtil extends FastByteComparisons {
* Joins a set of byte arrays into a larger array. The {@code interlude} is placed
@ -135,11 +135,7 @@ public class ByteArrayUtil {
if(src.length < start + pattern.length)
return false;
for(int i = 0; i < pattern.length; i++)
if(pattern[i] != src[start + i])
return false;
return true;
return compareTo(src, start, pattern.length, pattern, 0, pattern.length) == 0;
@ -307,14 +303,7 @@ public class ByteArrayUtil {
* {@code r}.
public static int compareUnsigned(byte[] l, byte[] r) {
for(int idx = 0; idx < l.length && idx < r.length; ++idx) {
if(l[idx] != r[idx]) {
return (l[idx] & 0xFF) < (r[idx] & 0xFF) ? -1 : 1;
if(l.length == r.length)
return 0;
return l.length < r.length ? -1 : 1;
return compareTo(l, 0, l.length, r, 0, r.length);
@ -328,15 +317,11 @@ public class ByteArrayUtil {
* @return {@code true} if {@code array} starts with {@code prefix}
public static boolean startsWith(byte[] array, byte[] prefix) {
// Short Circuit
if(array.length < prefix.length) {
return false;
for(int i = 0; i < prefix.length; ++i) {
if(prefix[i] != array[i]) {
return false;
return true;
return compareTo(array, 0, prefix.length, prefix, 0, prefix.length) == 0;
@ -0,0 +1,294 @@
* This source file is part of the FoundationDB open source project
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.lang.reflect.Field;
import java.nio.ByteOrder;
import java.util.Comparator;
import sun.misc.Unsafe;
* Utility code to do optimized byte-array comparison.
* This is borrowed and slightly modified from Guava's {@link UnsignedBytes}
* class to be able to compare arrays that start at non-zero offsets.
abstract class FastByteComparisons {
private static final int UNSIGNED_MASK = 0xFF;
* Lexicographically compare two byte arrays.
* @param buffer1 left operand, expected to not be null
* @param buffer2 right operand, expected to not be null
* @param offset1 Where to start comparing in the left buffer, expected to be >= 0
* @param offset2 Where to start comparing in the right buffer, expected to be >= 0
* @param length1 How much to compare from the left buffer, expected to be >= 0
* @param length2 How much to compare from the right buffer, expected to be >= 0
* @return 0 if equal, < 0 if left is less than right, etc.
public static int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
return LexicographicalComparerHolder.BEST_COMPARER.compareTo(
buffer1, offset1, length1, buffer2, offset2, length2);
* Interface for both the java and unsafe comparators + offset based comparisons.
* @param <T>
interface Comparer<T> extends Comparator<T> {
* Lexicographically compare two byte arrays.
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
* @param offset2 Where to start comparing in the right buffer
* @param length1 How much to compare from the left buffer
* @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
abstract public int compareTo(T buffer1, int offset1, int length1,
T buffer2, int offset2, int length2);
* Pure Java Comparer
* @return
static Comparer<byte[]> lexicographicalComparerJavaImpl() {
return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
* Unsafe Comparer
* @return
static Comparer<byte[]> lexicographicalComparerUnsafeImpl() {
return LexicographicalComparerHolder.UnsafeComparer.INSTANCE;
* Provides a lexicographical comparer implementation; either a Java
* implementation or a faster implementation based on {@link Unsafe}.
* <p>Uses reflection to gracefully fall back to the Java implementation if
* {@code Unsafe} isn't available.
private static class LexicographicalComparerHolder {
static final String UNSAFE_COMPARER_NAME =
LexicographicalComparerHolder.class.getName() + "$UnsafeComparer";
static final Comparer<byte[]> BEST_COMPARER = getBestComparer();
* Returns the Unsafe-using Comparer, or falls back to the pure-Java
* implementation if unable to do so.
static Comparer<byte[]> getBestComparer() {
String arch = System.getProperty("os.arch");
boolean unaligned = arch.equals("i386") || arch.equals("x86")
|| arch.equals("amd64") || arch.equals("x86_64");
if (!unaligned)
return lexicographicalComparerJavaImpl();
try {
Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
// yes, UnsafeComparer does implement Comparer<byte[]>
Comparer<byte[]> comparer =
(Comparer<byte[]>) theClass.getEnumConstants()[0];
return comparer;
} catch (Throwable t) { // ensure we really catch *everything*
return lexicographicalComparerJavaImpl();
* Java Comparer doing byte by byte comparisons
enum PureJavaComparer implements Comparer<byte[]> {
* CompareTo looking at two buffers.
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
* @param offset2 Where to start comparing in the right buffer
* @param length1 How much to compare from the left buffer
* @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
public int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
// Short circuit equal case
if (buffer1 == buffer2 &&
offset1 == offset2 &&
length1 == length2) {
return 0;
int end1 = offset1 + length1;
int end2 = offset2 + length2;
for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
int a = (buffer1[i] & UNSIGNED_MASK);
int b = (buffer2[j] & UNSIGNED_MASK);
if (a != b) {
return a - b;
return length1 - length2;
* Supports Comparator
* @param o1
* @param o2
* @return comparison
public int compare(byte[] o1, byte[] o2) {
return compareTo(o1, 0, o1.length, o2, 0, o2.length);
* Takes advantage of word based comparisons
@SuppressWarnings("unused") // used via reflection
enum UnsafeComparer implements Comparer<byte[]> {
static final Unsafe theUnsafe;
* The offset to the first element in a byte array.
static final int BYTE_ARRAY_BASE_OFFSET;
public int compare(byte[] o1, byte[] o2) {
return compareTo(o1, 0, o1.length, o2, 0, o2.length);
static {
theUnsafe = (Unsafe) AccessController.doPrivileged(
(PrivilegedAction<Object>) () -> {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
return f.get(null);
} catch (NoSuchFieldException e) {
// It doesn't matter what we throw;
// it's swallowed in getBestComparer().
throw new Error();
} catch (IllegalAccessException e) {
throw new Error();
BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
// sanity check - this should never fail
if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
throw new AssertionError();
static final boolean LITTLE_ENDIAN =
* Lexicographically compare two arrays.
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
* @param offset2 Where to start comparing in the right buffer
* @param length1 How much to compare from the left buffer
* @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
public int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
// Short circuit equal case
if (buffer1 == buffer2 &&
offset1 == offset2 &&
length1 == length2) {
return 0;
final int stride = 8;
final int minLength = Math.min(length1, length2);
int strideLimit = minLength & ~(stride - 1);
final long offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
final long offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
int i;
* Compare 8 bytes at a time. Benchmarking on x86 shows a stride of 8 bytes is no slower
* than 4 bytes even on 32-bit. On the other hand, it is substantially faster on 64-bit.
for (i = 0; i < strideLimit; i += stride) {
long lw = theUnsafe.getLong(buffer1, offset1Adj + i);
long rw = theUnsafe.getLong(buffer2, offset2Adj + i);
if (lw != rw) {
return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
* We want to compare only the first index where left[index] != right[index]. This
* corresponds to the least significant nonzero byte in lw ^ rw, since lw and rw are
* little-endian. Long.numberOfTrailingZeros(diff) tells us the least significant
* nonzero bit, and zeroing out the first three bits of L.nTZ gives us the shift to get
* that least significant nonzero byte. This comparison logic is based on UnsignedBytes
* comparator from guava v21
int n = Long.numberOfTrailingZeros(lw ^ rw) & ~0x7;
return ((int) ((lw >>> n) & UNSIGNED_MASK)) - ((int) ((rw >>> n) & UNSIGNED_MASK));
// The epilogue to cover the last (minLength % stride) elements.
for (; i < minLength; i++) {
int a = (buffer1[offset1 + i] & UNSIGNED_MASK);
int b = (buffer2[offset2 + i] & UNSIGNED_MASK);
if (a != b) {
return a - b;
return length1 - length2;
@ -75,7 +75,7 @@ sudo docker run --rm `# delete (temporary) image after return` \\
${image} "\$@"
cat <<EOF $HOME/bin/clangd
cat <<EOF > $HOME/bin/clangd
fdb-dev scl enable devtoolset-8 rh-python36 rh-ruby24 -- clangd
@ -320,9 +320,14 @@ set(CPACK_RPM_SERVER-EL7_USER_FILELIST
"%config(noreplace) /etc/foundationdb/foundationdb.conf"
"%attr(0700,foundationdb,foundationdb) /var/log/foundationdb"
"%attr(0700, foundationdb, foundationdb) /var/lib/foundationdb")
set(CPACK_RPM_CLIENTS-EL6_USER_FILELIST "%dir /etc/foundationdb")
set(CPACK_RPM_CLIENTS-EL7_USER_FILELIST "%dir /etc/foundationdb")
@ -934,5 +934,10 @@ struct StringRefReader {
Error failure_error;
namespace fileBackup {
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<IAsyncFile> file, int64_t offset,
int len);
#include "flow/unactorcompiler.h"
@ -20,6 +20,7 @@
#include "fdbclient/BackupContainer.h"
#include "fdbclient/"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/JsonBuilder.h"
#include "flow/Trace.h"
#include "flow/UnitTest.h"
@ -424,9 +425,11 @@ public:
// TODO: Do this more efficiently, as the range file list for a snapshot could potentially be hundreds of megabytes.
ACTOR static Future<std::vector<RangeFile>> readKeyspaceSnapshot_impl(Reference<BackupContainerFileSystem> bc, KeyspaceSnapshotFile snapshot) {
ACTOR static Future<std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>>> readKeyspaceSnapshot_impl(
Reference<BackupContainerFileSystem> bc, KeyspaceSnapshotFile snapshot) {
// Read the range file list for the specified version range, and then index them by fileName.
// This is so we can verify that each of the files listed in the manifest file are also in the container at this time.
// This is so we can verify that each of the files listed in the manifest file are also in the container at this
// time.
std::vector<RangeFile> files = wait(bc->listRangeFiles(snapshot.beginVersion, snapshot.endVersion));
state std::map<std::string, RangeFile> rangeIndex;
for(auto &f : files)
@ -482,15 +485,38 @@ public:
throw restore_missing_data();
return results;
// Check key ranges for files
std::map<std::string, KeyRange> fileKeyRanges;
JSONDoc ranges = doc.subDoc("keyRanges"); // Create an empty doc if not existed
for (auto i : ranges.obj()) {
const std::string& filename = i.first;
JSONDoc fields(i.second);
std::string begin, end;
if (fields.tryGet("beginKey", begin) && fields.tryGet("endKey", end)) {
.detail("File", filename)
.detail("Begin", printable(StringRef(begin)))
.detail("End", printable(StringRef(end)));
fileKeyRanges.emplace(filename, KeyRange(KeyRangeRef(StringRef(begin), StringRef(end))));
} else {
TraceEvent("MalFormattedManifest").detail("Key", filename);
throw restore_corrupted_data();
return std::make_pair(results, fileKeyRanges);
Future<std::vector<RangeFile>> readKeyspaceSnapshot(KeyspaceSnapshotFile snapshot) {
Future<std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>>> readKeyspaceSnapshot(
KeyspaceSnapshotFile snapshot) {
return readKeyspaceSnapshot_impl(Reference<BackupContainerFileSystem>::addRef(this), snapshot);
ACTOR static Future<Void> writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem> bc, std::vector<std::string> fileNames, int64_t totalBytes) {
ACTOR static Future<Void> writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem> bc,
std::vector<std::string> fileNames,
std::vector<std::pair<Key, Key>> beginEndKeys,
int64_t totalBytes) {
ASSERT(!fileNames.empty() && fileNames.size() == beginEndKeys.size());
state Version minVer = std::numeric_limits<Version>::max();
state Version maxVer = 0;
@ -521,6 +547,13 @@ public:
doc.create("beginVersion") = minVer;
doc.create("endVersion") = maxVer;
auto ranges = doc.subDoc("keyRanges");
for (int i = 0; i < beginEndKeys.size(); i++) {
auto fileDoc = ranges.subDoc(fileNames[i], /*split=*/false);
fileDoc.create("beginKey") = beginEndKeys[i].first.toString();
fileDoc.create("endKey") = beginEndKeys[i].second.toString();
state std::string docString = json_spirit::write_string(json);
@ -531,8 +564,11 @@ public:
return Void();
Future<Void> writeKeyspaceSnapshotFile(std::vector<std::string> fileNames, int64_t totalBytes) final {
return writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem>::addRef(this), fileNames, totalBytes);
Future<Void> writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) final {
return writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem>::addRef(this), fileNames,
beginEndKeys, totalBytes);
// List log files, unsorted, which contain data at any version >= beginVersion and <= targetVersion.
@ -1193,7 +1229,10 @@ public:
std::vector<LogFile> filtered;
int i = 0;
for (int j = 1; j < logs.size(); j++) {
if (logs[j].isSubset(logs[i])) continue;
if (logs[j].isSubset(logs[i])) {
ASSERT(logs[j].fileSize <= logs[i].fileSize);
if (!logs[i].isSubset(logs[j])) {
@ -1249,6 +1288,7 @@ public:
// filter out if indices.back() is subset of files[i] or vice versa
if (!indices.empty()) {
if (logs[indices.back()].isSubset(logs[i])) {
ASSERT(logs[indices.back()].fileSize <= logs[i].fileSize);
indices.back() = i;
} else if (!logs[i].isSubset(logs[indices.back()])) {
@ -1291,6 +1331,29 @@ public:
return end;
ACTOR static Future<KeyRange> getSnapshotFileKeyRange_impl(Reference<BackupContainerFileSystem> bc,
RangeFile file) {
state Reference<IAsyncFile> inFile = wait(bc->readFile(file.fileName));
state bool beginKeySet = false;
state Key beginKey;
state Key endKey;
state int64_t j = 0;
for (; j < file.fileSize; j += file.blockSize) {
int64_t len = std::min<int64_t>(file.blockSize, file.fileSize - j);
Standalone<VectorRef<KeyValueRef>> blockData = wait(fileBackup::decodeRangeFileBlock(inFile, j, len));
if (!beginKeySet) {
beginKey = blockData.front().key;
endKey = blockData.back().key;
return KeyRange(KeyRangeRef(beginKey, endKey));
Future<KeyRange> getSnapshotFileKeyRange(const RangeFile& file) final {
return getSnapshotFileKeyRange_impl(Reference<BackupContainerFileSystem>::addRef(this), file);
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc, Version targetVersion) {
// Find the most recent keyrange snapshot to end at or before targetVersion
state Optional<KeyspaceSnapshotFile> snapshot;
@ -1305,12 +1368,26 @@ public:
restorable.snapshot = snapshot.get();
restorable.targetVersion = targetVersion;
std::vector<RangeFile> ranges = wait(bc->readKeyspaceSnapshot(snapshot.get()));
restorable.ranges = ranges;
std::pair<std::vector<RangeFile>, std::map<std::string, KeyRange>> results =
restorable.ranges = std::move(results.first);
restorable.keyRanges = std::move(results.second);
if (g_network->isSimulated()) {
// Sanity check key ranges
state std::map<std::string, KeyRange>::iterator rit;
for (rit = restorable.keyRanges.begin(); rit != restorable.keyRanges.end(); rit++) {
auto it = std::find_if(restorable.ranges.begin(), restorable.ranges.end(),
[file = rit->first](const RangeFile f) { return f.fileName == file; });
ASSERT(it != restorable.ranges.end());
KeyRange result = wait(bc->getSnapshotFileKeyRange(*it));
ASSERT(rit->second.begin <= result.begin && rit->second.end >= result.end);
// No logs needed if there is a complete key space snapshot at the target version.
if (snapshot.get().beginVersion == snapshot.get().endVersion &&
snapshot.get().endVersion == targetVersion) {
restorable.continuousBeginVersion = restorable.continuousEndVersion = invalidVersion;
return Optional<RestorableFileSet>(restorable);
@ -1335,6 +1412,8 @@ public:
// sort by version order again for continuous analysis
std::sort(restorable.logs.begin(), restorable.logs.end());
if (isPartitionedLogsContinuous(restorable.logs, snapshot.get().beginVersion, targetVersion)) {
restorable.continuousBeginVersion = snapshot.get().beginVersion;
restorable.continuousEndVersion = targetVersion + 1; // not inclusive
return Optional<RestorableFileSet>(restorable);
return Optional<RestorableFileSet>();
@ -1348,6 +1427,8 @@ public:
Version end = logs.begin()->endVersion;
computeRestoreEndVersion(logs, &restorable.logs, &end, targetVersion);
if (end >= targetVersion) {
restorable.continuousBeginVersion = logs.begin()->beginVersion;
restorable.continuousEndVersion = end;
return Optional<RestorableFileSet>(restorable);
@ -2070,6 +2151,7 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
state std::vector<Future<Void>> writes;
state std::map<Version, std::vector<std::string>> snapshots;
state std::map<Version, int64_t> snapshotSizes;
state std::map<Version, std::vector<std::pair<Key, Key>>> snapshotBeginEndKeys;
state int nRangeFiles = 0;
state std::map<Version, std::string> logs;
state Version v = deterministicRandom()->randomInt64(0, std::numeric_limits<Version>::max() / 2);
@ -2084,6 +2166,7 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
while(kvfiles > 0) {
if(snapshots.empty()) {
snapshots[v] = {};
snapshotBeginEndKeys[v] = {};
snapshotSizes[v] = 0;
if(deterministicRandom()->coinflip()) {
v = nextVersion(v);
@ -2093,14 +2176,17 @@ ACTOR Future<Void> testBackupContainer(std::string url) {
v = nextVersion(v);
snapshotBeginEndKeys.rbegin()->second.emplace_back(LiteralStringRef(""), LiteralStringRef(""));
int size = chooseFileSize(fileSizes);
snapshotSizes.rbegin()->second += size;
writes.push_back(writeAndVerifyFile(c, range, size));
if(deterministicRandom()->random01() < .2) {
writes.push_back(c->writeKeyspaceSnapshotFile(snapshots.rbegin()->second, snapshotSizes.rbegin()->second));
snapshots.rbegin()->second, snapshotBeginEndKeys.rbegin()->second, snapshotSizes.rbegin()->second));
snapshots[v] = {};
snapshotBeginEndKeys[v] = {};
snapshotSizes[v] = 0;
@ -108,6 +108,12 @@ struct RangeFile {
std::string fileName;
int64_t fileSize;
RangeFile() {}
RangeFile(Version v, uint32_t bSize, std::string name, int64_t size)
: version(v), blockSize(bSize), fileName(name), fileSize(size) {}
RangeFile(const RangeFile& f)
: version(f.version), blockSize(f.blockSize), fileName(f.fileName), fileSize(f.fileSize) {}
// Order by version, break ties with name
bool operator< (const RangeFile &rhs) const {
return version == rhs.version ? fileName < rhs.fileName : version < rhs.version;
@ -193,6 +199,14 @@ struct RestorableFileSet {
Version targetVersion;
std::vector<LogFile> logs;
std::vector<RangeFile> ranges;
// Range file's key ranges. Can be empty for backups generated before 6.3.
std::map<std::string, KeyRange> keyRanges;
// Mutation logs continuous range [begin, end). Both can be invalidVersion
// when the entire key space snapshot is at the target version.
Version continuousBeginVersion, continuousEndVersion;
KeyspaceSnapshotFile snapshot; // Info. for debug purposes
@ -231,11 +245,17 @@ public:
// Write a KeyspaceSnapshotFile of range file names representing a full non overlapping
// snapshot of the key ranges this backup is targeting.
virtual Future<Void> writeKeyspaceSnapshotFile(std::vector<std::string> fileNames, int64_t totalBytes) = 0;
virtual Future<Void> writeKeyspaceSnapshotFile(const std::vector<std::string>& fileNames,
const std::vector<std::pair<Key, Key>>& beginEndKeys,
int64_t totalBytes) = 0;
// Open a file for read by name
virtual Future<Reference<IAsyncFile>> readFile(std::string name) = 0;
// Returns the key ranges in the snapshot file. This is an expensive function
// and should only be used in simulation for sanity check.
virtual Future<KeyRange> getSnapshotFileKeyRange(const RangeFile& file) = 0;
struct ExpireProgress {
std::string step;
int total;
@ -2257,6 +2257,7 @@ namespace fileBackup {
std::vector<std::string> files;
std::vector<std::pair<Key, Key>> beginEndKeys;
state Version maxVer = 0;
state Version minVer = std::numeric_limits<Version>::max();
state int64_t totalBytes = 0;
@ -2272,6 +2273,9 @@ namespace fileBackup {
// Add file to final file list
// Add (beginKey, endKey) pairs to the list
beginEndKeys.emplace_back(i->second.begin, i->first);
// Update version range seen
if(r.version < minVer)
minVer = r.version;
@ -2293,7 +2297,7 @@ namespace fileBackup {
Params.endVersion().set(task, maxVer);
wait(bc->writeKeyspaceSnapshotFile(files, totalBytes));
wait(bc->writeKeyspaceSnapshotFile(files, beginEndKeys, totalBytes));
TraceEvent(SevInfo, "FileBackupWroteSnapshotManifest")
.detail("BackupUID", config.getUid())
@ -193,7 +193,7 @@ struct JSONDoc {
return v.get_value<T>();
// Ensures that a an Object exists at path and returns a JSONDoc that writes to it.
// Ensures that an Object exists at path and returns a JSONDoc that writes to it.
JSONDoc subDoc(std::string path, bool split=true) {
json_spirit::mValue &v = create(path, split);
if(v.type() != json_spirit::obj_type)
@ -83,6 +83,15 @@ std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgr
auto progressIt = progress.lower_bound(epoch);
if (progressIt != progress.end() && progressIt->first == epoch) {
if (progressIt != progress.begin()) {
// Previous epoch is gone, consolidate the progress.
auto prev = std::prev(progressIt);
for (auto [tag, version] : prev->second) {
if (tags.count(tag) > 0) {
progressIt->second[tag] = std::max(version, progressIt->second[tag]);
updateTagVersions(&tagVersions, &tags, progressIt->second, info.epochEnd, adjustedBeginVersion, epoch);
} else {
auto rit = std::find_if(
@ -68,13 +68,15 @@ struct BackupData {
const UID myId;
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
const int totalTags; // Total log router tags
const Version startVersion;
// Backup request's commit version. Mutations are logged at some version after this.
const Version startVersion; // This worker's start version
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
const LogEpoch recruitedEpoch; // current epoch whose tLogs are receiving mutations
const LogEpoch backupEpoch; // the epoch workers should pull mutations
LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull
Version minKnownCommittedVersion;
Version savedVersion;
Version savedVersion; // Largest version saved to blob storage
Version popVersion; // Largest version popped in NOOP mode, can be larger than savedVersion.
AsyncVar<Reference<ILogSystem>> logSystem;
Database cx;
std::vector<VersionedMessage> messages;
@ -225,7 +227,7 @@ struct BackupData {
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
@ -291,7 +293,7 @@ struct BackupData {
ASSERT_WE_THINK(backupEpoch == oldestBackupEpoch);
const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass);
logSystem.get()->pop(savedVersion, popTag);
logSystem.get()->pop(std::max(popVersion, savedVersion), popTag);
void stop() {
@ -326,11 +328,15 @@ struct BackupData {
bool modified = false;
bool minVersionChanged = false;
Version minVersion = std::numeric_limits<Version>::max();
for (const auto [uid, version] : uidVersions) {
auto it = backups.find(uid);
if (it == backups.end()) {
modified = true;
backups.emplace(uid, BackupData::PerBackupInfo(this, uid, version));
minVersion = std::min(minVersion, version);
minVersionChanged = true;
} else {
@ -342,6 +348,14 @@ struct BackupData {
modified = true;
if (minVersionChanged && backupEpoch < recruitedEpoch && savedVersion + 1 == startVersion) {
// Advance savedVersion to minimize version ranges in case backupEpoch's
// progress is not saved. Master may set a very low startVersion that
// is already popped. Advance the version is safe because these
// versions are not popped -- if they are popped, their progress should
// be already recorded and Master would use a higher version than minVersion.
savedVersion = std::max(minVersion, savedVersion);
if (modified) changedTrigger.trigger();
@ -390,10 +404,10 @@ struct BackupData {
Future<Version> getMinKnownCommittedVersion() { return _getMinKnownCommittedVersion(this); }
// Monitors "backupStartedKey". If "started" is true, wait until the key is set;
// Monitors "backupStartedKey". If "present" is true, wait until the key is set;
// otherwise, wait until the key is cleared. If "watch" is false, do not perform
// the wait for key set/clear events. Returns if key present.
ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool started, bool watch) {
ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool present, bool watch) {
loop {
state ReadYourWritesTransaction tr(self->cx);
@ -418,13 +432,13 @@ ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool started
self->exitEarly = shouldExit;
if (started || !watch) return true;
if (present || !watch) return true;
} else {
TraceEvent("BackupWorkerEmptyStartKey", self->myId);
self->exitEarly = shouldExit;
if (!started || !watch) {
if (!present || !watch) {
return false;
@ -650,8 +664,13 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
self->insertRanges(keyRangeMap, it->second.ranges.get(), index);
if (it->second.lastSavedVersion == invalidVersion) {
it->second.lastSavedVersion =
self->savedVersion > self->startVersion ? self->savedVersion : self->startVersion;
if (it->second.startVersion > self->startVersion && !self->messages.empty()) {
// True-up first mutation log's begin version
it->second.lastSavedVersion = self->messages[0].getVersion();
} else {
it->second.lastSavedVersion =
std::max(self->popVersion, std::max(self->savedVersion, self->startVersion));
it->second.lastSavedVersion, popVersion + 1, blockSize, self->, self->totalTags));
@ -762,6 +781,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
if (((numMsg > 0 || popVersion > lastPopVersion) && self->pulling) || self->pullFinished()) {
TraceEvent("BackupWorkerSave", self->myId)
.detail("Version", popVersion)
.detail("SavedVersion", self->savedVersion)
.detail("MsgQ", self->messages.size());
// save an empty file for old epochs so that log file versions are continuous
wait(saveMutationsToFile(self, popVersion, numMsg));
@ -769,9 +789,11 @@ ACTOR Future<Void> uploadData(BackupData* self) {
// If transition into NOOP mode, should clear messages
if (!self->pulling) self->messages.clear();
if (!self->pulling) {
if (popVersion > self->savedVersion) {
if (popVersion > self->savedVersion && popVersion > self->popVersion) {
wait(saveProgress(self, popVersion));
TraceEvent("BackupWorkerSavedProgress", self->myId)
.detail("Tag", self->tag.toString())
@ -872,10 +894,13 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
when(wait(success(present))) { break; }
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
if (committedVersion.isReady()) {
self->savedVersion = std::max(committedVersion.get(), self->savedVersion);
self->popVersion =
std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion));
self->minKnownCommittedVersion =
std::max(committedVersion.get(), self->minKnownCommittedVersion);
TraceEvent("BackupWorkerNoopPop", self->myId).detail("SavedVersion", self->savedVersion);
TraceEvent("BackupWorkerNoopPop", self->myId)
.detail("SavedVersion", self->savedVersion)
.detail("PopVersion", self->popVersion);
self->pop(); // Pop while the worker is in this NOOP state.
committedVersion = Never();
} else {
@ -884,6 +909,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
ASSERT(!keyPresent == present.get());
keyPresent = !keyPresent;
@ -297,63 +297,6 @@ std::string RestoreConfigFR::toString() {
// parallelFileRestore is copied from for the same reason as RestoreConfigFR is copied
namespace parallelFileRestore {
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<IAsyncFile> file, int64_t offset,
int len) {
state Standalone<StringRef> buf = makeString(len);
int rLen = wait(file->read(mutateString(buf), len, offset));
if (rLen != len) throw restore_bad_read();
Standalone<VectorRef<KeyValueRef>> results({}, buf.arena());
state StringRefReader reader(buf, restore_corrupted_data());
try {
// Read header, currently only decoding version 1001
if (reader.consume<int32_t>() != 1001) throw restore_unsupported_file_version();
// Read begin key, if this fails then block was invalid.
uint32_t kLen = reader.consumeNetworkUInt32();
const uint8_t* k = reader.consume(kLen);
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef()));
// Read kv pairs and end key
while (1) {
// Read a key.
kLen = reader.consumeNetworkUInt32();
k = reader.consume(kLen);
// If eof reached or first value len byte is 0xFF then a valid block end was reached.
if (reader.eof() || *reader.rptr == 0xFF) {
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef()));
// Read a value, which must exist or the block is invalid
uint32_t vLen = reader.consumeNetworkUInt32();
const uint8_t* v = reader.consume(vLen);
results.push_back(results.arena(), KeyValueRef(KeyRef(k, kLen), ValueRef(v, vLen)));
// If eof reached or first byte of next key len is 0xFF then a valid block end was reached.
if (reader.eof() || *reader.rptr == 0xFF) break;
// Make sure any remaining bytes in the block are 0xFF
for (auto b : reader.remainder())
if (b != 0xFF) throw restore_corrupted_data_padding();
return results;
} catch (Error& e) {
TraceEvent(SevError, "FileRestoreCorruptRangeFileBlock")
.detail("Filename", file->getFilename())
.detail("BlockOffset", offset)
.detail("BlockLen", len)
.detail("ErrorRelativeOffset", reader.rptr - buf.begin())
.detail("ErrorAbsoluteOffset", reader.rptr - buf.begin() + offset);
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file, int64_t offset,
int len) {
state Standalone<StringRef> buf = makeString(len);
@ -248,8 +248,6 @@ struct RestoreFileFR {
namespace parallelFileRestore {
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<IAsyncFile> file, int64_t offset,
int len);
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file, int64_t offset,
int len);
} // namespace parallelFileRestore
@ -23,6 +23,7 @@
#include "flow/UnitTest.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/"
#include "fdbserver/"
#include "fdbserver/"
@ -817,11 +818,18 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// The set of key value version is rangeFile.version. the key-value set in the same range file has the same version
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
Standalone<VectorRef<KeyValueRef>> blockData =
wait(parallelFileRestore::decodeRangeFileBlock(inFile, asset.offset, asset.len));
.detail("DecodedRangeFile", asset.filename)
.detail("DataSize", blockData.contents().size());
state VectorRef<KeyValueRef> blockData;
try {
Standalone<VectorRef<KeyValueRef>> kvs =
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
.detail("DecodedRangeFile", asset.filename)
.detail("DataSize", kvs.contents().size());
blockData = kvs;
} catch (Error& e) {
TraceEvent(SevError, "FileRestoreCorruptRangeFileBlock").error(e);
// First and last key are the range for this file
KeyRange fileRange = KeyRangeRef(blockData.front().key, blockData.back().key);
@ -717,22 +717,10 @@ ACTOR static Future<Version> collectBackupFiles(Reference<IBackupContainer> bc,
ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersions, RestoreFileFR* file,
Reference<IBackupContainer> bc) {
TraceEvent("FastRestoreMasterDecodeRangeVersion").detail("File", file->toString());
state Reference<IAsyncFile> inFile = wait(bc->readFile(file->fileName));
state bool beginKeySet = false;
state Key beginKey;
state Key endKey;
state int64_t j = 0;
for (; j < file->fileSize; j += file->blockSize) {
int64_t len = std::min<int64_t>(file->blockSize, file->fileSize - j);
Standalone<VectorRef<KeyValueRef>> blockData = wait(parallelFileRestore::decodeRangeFileBlock(inFile, j, len));
if (!beginKeySet) {
beginKey = blockData.front().key;
endKey = blockData.back().key;
RangeFile rangeFile(file->version, file->blockSize, file->fileName, file->fileSize);
// First and last key are the range for this file: endKey is exclusive
KeyRange fileRange = KeyRangeRef(beginKey.contents(), endKey.contents());
KeyRange fileRange = wait(bc->getSnapshotFileKeyRange(rangeFile));
.detail("DecodedRangeFile", file->fileName)
.detail("KeyRange", fileRange)
@ -35,8 +35,8 @@
#include <cstdint>
#include <cstdarg>
//#define SevFRMutationInfo SevVerbose
#define SevFRMutationInfo SevInfo
#define SevFRMutationInfo SevVerbose
//#define SevFRMutationInfo SevInfo
struct VersionedMutation {
MutationRef mutation;
@ -188,6 +188,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
bool remoteLogsWrittenToCoreState;
bool hasRemoteServers;
AsyncTrigger backupWorkerChanged;
std::set<UID> removedBackupWorkers; // Workers that are removed before setting them.
Optional<Version> recoverAt;
Optional<Version> recoveredAt;
@ -1399,6 +1400,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
LogEpoch logsetEpoch = this->epoch;
oldestBackupEpoch = this->epoch;
for (const auto& reply : replies) {
if (removedBackupWorkers.count( > 0) {
Reference<AsyncVar<OptionalInterface<BackupInterface>>> worker(new AsyncVar<OptionalInterface<BackupInterface>>(OptionalInterface<BackupInterface>(reply.interf)));
if (reply.backupEpoch != logsetEpoch) {
// find the logset from oldLogData
@ -1408,6 +1413,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TraceEvent("AddBackupWorker", dbgid)
.detail("Epoch", logsetEpoch)
TraceEvent("SetOldestBackupEpoch", dbgid).detail("Epoch", oldestBackupEpoch);
@ -1434,6 +1442,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
} else {
TraceEvent("RemoveBackupWorker", dbgid)
@ -450,7 +450,8 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
targetVersion = desc.minRestorableVersion.get();
} else if (deterministicRandom()->random01() < 0.1) {
targetVersion = desc.maxRestorableVersion.get();
} else if (deterministicRandom()->random01() < 0.5) {
} else if (deterministicRandom()->random01() < 0.5 &&
desc.minRestorableVersion.get() < desc.contiguousLogEnd.get()) {
// The assertion may fail because minRestorableVersion may be decided by snapshot version.
// ASSERT_WE_THINK(desc.minRestorableVersion.get() <= desc.contiguousLogEnd.get());
// This assertion can fail when contiguousLogEnd < maxRestorableVersion and
@ -953,7 +953,15 @@ namespace actorcompiler
// if it has side effects
||||"if (!{0}->SAV<{1}>::futures) {{ (void)({2}); this->~{3}(); {0}->destroy(); return 0; }}", This, actor.returnType, stmt.expression, stateClassName);
// Build the return value directly in SAV<T>::value_storage
||||"new (&{0}->SAV< {1} >::value()) {1}({2});", This, actor.returnType, stmt.expression);
// If the expression is exactly the name of a state variable, std::move() it
if (state.Exists(s => == stmt.expression))
||||"new (&{0}->SAV< {1} >::value()) {1}(std::move({2})); // state_var_RVO", This, actor.returnType, stmt.expression);
||||"new (&{0}->SAV< {1} >::value()) {1}({2});", This, actor.returnType, stmt.expression);
// Destruct state
||||"this->~{0}();", stateClassName);
// Tell SAV<T> to return the value we already constructed in value_storage
@ -687,6 +687,11 @@ public:
Future(T&& presentValue)
: sav(new SAV<T>(1, 0))
: sav(new SAV<T>(1, 0))
Reference in New Issue