Merge branch 'master' into transaction-tagging

# Conflicts:
#	fdbclient/MasterProxyInterface.h
#	fdbclient/NativeAPI.actor.cpp
This commit is contained in:
A.J. Beamon 2020-05-04 10:23:25 -07:00
commit 36454bb3b8
68 changed files with 1312 additions and 605 deletions

View File

@ -146,6 +146,10 @@ set(SEED "0x${SEED_}" CACHE STRING "Random seed for testing")
# components
################################################################################
if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
include_directories(/usr/local/include)
endif()
include(CompileBoost)
add_subdirectory(flow)
add_subdirectory(fdbrpc)
@ -173,6 +177,10 @@ else()
include(CPack)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
add_link_options(-lexecinfo)
endif()
################################################################################
# process compile commands for IDE
################################################################################

View File

@ -123,6 +123,37 @@ cmake -G Xcode -DOPEN_FOR_IDE=ON <FDB_SOURCE_DIRECTORY>
You should create a second build-directory which you will use for building
(probably with make or ninja) and debugging.
#### FreeBSD
1. Check out this repo on your server.
1. Install compile-time dependencies from ports.
1. (Optional) Use tmpfs & ccache for significantly faster repeat builds
1. (Optional) Install a [JDK](https://www.freshports.org/java/openjdk8/)
for Java Bindings. FoundationDB currently builds with Java 8.
1. Navigate to the directory where you checked out the foundationdb
repo.
1. Build from source.
```shell
sudo pkg install -r FreeBSD \
shells/bash devel/cmake devel/ninja devel/ccache \
lang/mono lang/python3 \
devel/boost-libs devel/libeio \
security/openssl
mkdir .build && cd .build
cmake -G Ninja \
-DUSE_CCACHE=on \
-DDISABLE_TLS=off \
-DUSE_DTRACE=off \
..
ninja -j 10
# run fast tests
ctest -L fast
# run all tests
ctest --output-on-failure -v
```
### Linux
There are no special requirements for Linux. A docker image can be pulled from

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
######################################################
#
# FoundationDB Binding Test Script

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
LOGGING_LEVEL=WARNING

View File

@ -61,7 +61,7 @@ def write_windows_asm(asmfile, functions):
def write_unix_asm(asmfile, functions, prefix):
asmfile.write(".intel_syntax noprefix\n")
if platform == "linux":
if platform == "linux" or platform == "freebsd":
asmfile.write("\n.data\n")
for f in functions:
asmfile.write("\t.extern fdb_api_ptr_%s\n" % f)

View File

@ -12,6 +12,9 @@
#if defined(__linux__)
#include <linux/limits.h>
#elif defined(__FreeBSD__)
#include <sys/stat.h>
#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC_FAST
#elif defined(__APPLE__)
#include <sys/syslimits.h>
#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC

View File

@ -25,6 +25,9 @@ platform=$(uname)
if [[ "${platform}" == "Darwin" ]] ; then
FDBLIBDIR="${FDBLIBDIR:-/usr/local/lib}"
libfdbc="libfdb_c.dylib"
elif [[ "${platform}" == "FreeBSD" ]] ; then
FDBLIBDIR="${FDBLIBDIR:-/lib}"
libfdbc="libfdb_c.so"
elif [[ "${platform}" == "Linux" ]] ; then
libfdbc="libfdb_c.so"
custom_libdir="${FDBLIBDIR:-}"
@ -248,8 +251,11 @@ else
:
elif [[ "${status}" -eq 0 ]] ; then
echo "Building generated files."
if [[ "${platform}" == "FreeBSD" ]] ; then
cmd=( 'gmake' '-C' "${fdbdir}" 'bindings/c/foundationdb/fdb_c_options.g.h' )
else
cmd=( 'make' '-C' "${fdbdir}" 'bindings/c/foundationdb/fdb_c_options.g.h' )
fi
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
let status="${status} + 1"

View File

@ -23,6 +23,8 @@
package directory
import (
"fmt"
"strings"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
)
@ -43,6 +45,18 @@ type directorySubspace struct {
layer []byte
}
// String implements the fmt.Stringer interface and returns human-readable
// string representation of this object.
func (ds directorySubspace) String() string {
var path string
if len(ds.path) > 0 {
path = "(" + strings.Join(ds.path, ",") + ")"
} else {
path = "nil"
}
return fmt.Sprintf("DirectorySubspace(%s, %s)", path, fdb.Printable(ds.Bytes()))
}
func (d directorySubspace) CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error) {
return d.dl.CreateOrOpen(t, d.dl.partitionSubpath(d.path, path), layer)
}

View File

@ -35,6 +35,8 @@ package subspace
import (
"bytes"
"errors"
"fmt"
"github.com/apple/foundationdb/bindings/go/src/fdb"
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
)
@ -82,7 +84,7 @@ type Subspace interface {
}
type subspace struct {
b []byte
rawPrefix []byte
}
// AllKeys returns the Subspace corresponding to all keys in a FoundationDB
@ -105,40 +107,46 @@ func FromBytes(b []byte) Subspace {
return subspace{s}
}
// String implements the fmt.Stringer interface and return the subspace
// as a human readable byte string provided by fdb.Printable.
func (s subspace) String() string {
return fmt.Sprintf("Subspace(rawPrefix=%s)", fdb.Printable(s.rawPrefix))
}
func (s subspace) Sub(el ...tuple.TupleElement) Subspace {
return subspace{concat(s.Bytes(), tuple.Tuple(el).Pack()...)}
}
func (s subspace) Bytes() []byte {
return s.b
return s.rawPrefix
}
func (s subspace) Pack(t tuple.Tuple) fdb.Key {
return fdb.Key(concat(s.b, t.Pack()...))
return fdb.Key(concat(s.rawPrefix, t.Pack()...))
}
func (s subspace) PackWithVersionstamp(t tuple.Tuple) (fdb.Key, error) {
return t.PackWithVersionstamp(s.b)
return t.PackWithVersionstamp(s.rawPrefix)
}
func (s subspace) Unpack(k fdb.KeyConvertible) (tuple.Tuple, error) {
key := k.FDBKey()
if !bytes.HasPrefix(key, s.b) {
if !bytes.HasPrefix(key, s.rawPrefix) {
return nil, errors.New("key is not in subspace")
}
return tuple.Unpack(key[len(s.b):])
return tuple.Unpack(key[len(s.rawPrefix):])
}
func (s subspace) Contains(k fdb.KeyConvertible) bool {
return bytes.HasPrefix(k.FDBKey(), s.b)
return bytes.HasPrefix(k.FDBKey(), s.rawPrefix)
}
func (s subspace) FDBKey() fdb.Key {
return fdb.Key(s.b)
return fdb.Key(s.rawPrefix)
}
func (s subspace) FDBRangeKeys() (fdb.KeyConvertible, fdb.KeyConvertible) {
return fdb.Key(concat(s.b, 0x00)), fdb.Key(concat(s.b, 0xFF))
return fdb.Key(concat(s.rawPrefix, 0x00)), fdb.Key(concat(s.rawPrefix, 0xFF))
}
func (s subspace) FDBRangeKeySelectors() (fdb.Selectable, fdb.Selectable) {

View File

@ -0,0 +1,15 @@
package subspace
import (
"fmt"
"testing"
)
func TestSubspaceString(t *testing.T) {
printed := fmt.Sprint(Sub([]byte("hello"), "world", 42, 0x99))
expected := "Subspace(rawPrefix=\\x01hello\\x00\\x02world\\x00\\x15*\\x15\\x99)"
if printed != expected {
t.Fatalf("printed subspace result differs, expected %v, got %v", expected, printed)
}
}

View File

@ -43,6 +43,8 @@ import (
"fmt"
"math"
"math/big"
"strconv"
"strings"
"github.com/apple/foundationdb/bindings/go/src/fdb"
)
@ -66,6 +68,48 @@ type TupleElement interface{}
// packing T (modulo type normalization to []byte, uint64, and int64).
type Tuple []TupleElement
// String implements the fmt.Stringer interface and returns human-readable
// string representation of this tuple. For most elements, we use the
// object's default string representation.
func (tuple Tuple) String() string {
sb := strings.Builder{}
printTuple(tuple, &sb)
return sb.String()
}
func printTuple(tuple Tuple, sb *strings.Builder) {
sb.WriteString("(")
for i, t := range tuple {
switch t := t.(type) {
case Tuple:
printTuple(t, sb)
case nil:
sb.WriteString("<nil>")
case string:
sb.WriteString(strconv.Quote(t))
case UUID:
sb.WriteString("UUID(")
sb.WriteString(t.String())
sb.WriteString(")")
case []byte:
sb.WriteString("b\"")
sb.WriteString(fdb.Printable(t))
sb.WriteString("\"")
default:
// For user-defined and standard types, we use standard Go
// printer, which itself uses Stringer interface.
fmt.Fprintf(sb, "%v", t)
}
if (i < len(tuple) - 1) {
sb.WriteString(", ")
}
}
sb.WriteString(")")
}
// UUID wraps a basic byte array as a UUID. We do not provide any special
// methods for accessing or generating the UUID, but as Go does not provide
// a built-in UUID type, this simple wrapper allows for other libraries
@ -73,6 +117,10 @@ type Tuple []TupleElement
// an instance of this type.
type UUID [16]byte
func (uuid UUID) String() string {
return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:])
}
// Versionstamp is struct for a FoundationDB verionstamp. Versionstamps are
// 12 bytes long composed of a 10 byte transaction version and a 2 byte user
// version. The transaction version is filled in at commit time and the user
@ -82,6 +130,11 @@ type Versionstamp struct {
UserVersion uint16
}
// Returns a human-readable string for this Versionstamp.
func (vs Versionstamp) String() string {
return fmt.Sprintf("Versionstamp(%s, %d)", fdb.Printable(vs.TransactionVersion[:]), vs.UserVersion)
}
var incompleteTransactionVersion = [10]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
const versionstampLength = 12

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/gob"
"flag"
"fmt"
"math/rand"
"os"
"testing"
@ -118,3 +119,38 @@ func BenchmarkTuplePacking(b *testing.B) {
})
}
}
func TestTupleString(t *testing.T) {
testCases :=[ ]struct {
input Tuple
expected string
}{
{
Tuple{[]byte("hello"), "world", 42, 0x99},
"(b\"hello\", \"world\", 42, 153)",
},
{
Tuple{nil, Tuple{"Ok", Tuple{1, 2}, "Go"}, 42, 0x99},
"(<nil>, (\"Ok\", (1, 2), \"Go\"), 42, 153)",
},
{
Tuple{"Bool", true, false},
"(\"Bool\", true, false)",
},
{
Tuple{"UUID", testUUID},
"(\"UUID\", UUID(1100aabb-ccdd-eeff-1100-aabbccddeeff))",
},
{
Tuple{"Versionstamp", Versionstamp{[10]byte{0, 0, 0, 0xaa, 0, 0xbb, 0, 0xcc, 0, 0xdd}, 620}},
"(\"Versionstamp\", Versionstamp(\\x00\\x00\\x00\\xaa\\x00\\xbb\\x00\\xcc\\x00\\xdd, 620))",
},
}
for _, testCase := range testCases {
printed := fmt.Sprint(testCase.input)
if printed != testCase.expected {
t.Fatalf("printed tuple result differs, expected %v, got %v", testCase.expected, printed)
}
}
}

View File

@ -1231,6 +1231,8 @@ if platform.system() == 'Windows':
capi_name = 'fdb_c.dll'
elif platform.system() == 'Linux':
capi_name = 'libfdb_c.so'
elif platform.system() == 'FreeBSD':
capi_name = 'libfdb_c.so'
elif platform.system() == 'Darwin':
capi_name = 'libfdb_c.dylib'
elif sys.platform == 'win32':

View File

@ -278,7 +278,51 @@ function(package_bindingtester)
COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_SOURCE_DIR}/bindings ${CMAKE_BINARY_DIR}/bindingtester/tests
COMMAND ${CMAKE_COMMAND} -E touch "${CMAKE_BINARY_DIR}/bindingtester.touch"
COMMENT "Copy test files for bindingtester")
add_custom_target(copy_bindingtester_binaries DEPENDS ${outfiles} "${CMAKE_BINARY_DIR}/bindingtester.touch")
add_custom_target(copy_binding_output_files DEPENDS ${CMAKE_BINARY_DIR}/bindingtester.touch python_binding fdb_flow_tester)
add_custom_command(
TARGET copy_binding_output_files
COMMAND ${CMAKE_COMMAND} -E copy $<TARGET_FILE:fdb_flow_tester> ${bdir}/tests/flow/bin/fdb_flow_tester
COMMENT "Copy Flow tester for bindingtester")
set(generated_binding_files python/fdb/fdboptions.py)
if(WITH_JAVA)
if(NOT FDB_RELEASE)
set(prerelease_string "-PRERELEASE")
else()
set(prerelease_string "")
endif()
add_custom_command(
TARGET copy_binding_output_files
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_BINARY_DIR}/packages/fdb-java-${CMAKE_PROJECT_VERSION}${prerelease_string}.jar
${bdir}/tests/java/foundationdb-client.jar
COMMENT "Copy Java bindings for bindingtester")
add_dependencies(copy_binding_output_files fat-jar)
add_dependencies(copy_binding_output_files foundationdb-tests)
set(generated_binding_files ${generated_binding_files} java/foundationdb-tests.jar)
endif()
if(WITH_GO AND NOT OPEN_FOR_IDE)
add_dependencies(copy_binding_output_files fdb_go_tester fdb_go)
add_custom_command(
TARGET copy_binding_output_files
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_BINARY_DIR}/bindings/go/bin/_stacktester ${bdir}/tests/go/build/bin/_stacktester
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_BINARY_DIR}/bindings/go/src/github.com/apple/foundationdb/bindings/go/src/fdb/generated.go # SRC
${bdir}/tests/go/src/fdb/ # DEST
COMMENT "Copy generated.go for bindingtester")
endif()
foreach(generated IN LISTS generated_binding_files)
add_custom_command(
TARGET copy_binding_output_files
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_BINARY_DIR}/bindings/${generated} ${bdir}/tests/${generated}
COMMENT "Copy ${generated} to bindingtester")
endforeach()
add_custom_target(copy_bindingtester_binaries
DEPENDS ${outfiles} "${CMAKE_BINARY_DIR}/bindingtester.touch" copy_binding_output_files)
add_dependencies(copy_bindingtester_binaries strip_only_fdbserver strip_only_fdbcli strip_only_fdb_c)
set(tar_file ${CMAKE_BINARY_DIR}/packages/bindingtester-${CMAKE_PROJECT_VERSION}.tar.gz)
add_custom_command(

View File

@ -1,6 +1,7 @@
include(CompilerChecks)
env_set(USE_GPERFTOOLS OFF BOOL "Use gperfools for profiling")
env_set(USE_DTRACE ON BOOL "Enable dtrace probes on supported platforms")
env_set(USE_VALGRIND OFF BOOL "Compile for valgrind usage")
env_set(USE_VALGRIND_FOR_CTEST ${USE_VALGRIND} BOOL "Use valgrind for ctest")
env_set(ALLOC_INSTRUMENTATION OFF BOOL "Instrument alloc")
@ -255,7 +256,7 @@ else()
check_symbol_exists(DTRACE_PROBE sys/sdt.h SUPPORT_DTRACE)
check_symbol_exists(aligned_alloc stdlib.h HAS_ALIGNED_ALLOC)
message(STATUS "Has aligned_alloc: ${HAS_ALIGNED_ALLOC}")
if(SUPPORT_DTRACE)
if((SUPPORT_DTRACE) AND (USE_DTRACE))
add_compile_definitions(DTRACE_PROBES)
endif()
if(HAS_ALIGNED_ALLOC)

View File

@ -47,7 +47,8 @@ endif()
set(WITH_JAVA OFF)
find_package(JNI 1.8)
find_package(Java 1.8 COMPONENTS Development)
if(JNI_FOUND AND Java_FOUND AND Java_Development_FOUND)
# leave FreeBSD JVM compat for later
if(JNI_FOUND AND Java_FOUND AND Java_Development_FOUND AND NOT (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD"))
set(WITH_JAVA ON)
include(UseJava)
enable_language(Java)

View File

@ -2198,13 +2198,15 @@ ACTOR Future<Void> runFastRestoreAgent(Database db, std::string tagName, std::st
state Version restoreVersion = invalidVersion;
if (ranges.size() > 1) {
fprintf(stderr, "Currently only a single restore range is supported!\n");
throw restore_error();
fprintf(stdout, "[WARNING] Currently only a single restore range is tested!\n");
}
state KeyRange range = (ranges.size() == 0) ? normalKeys : ranges.front();
if (ranges.size() == 0) {
ranges.push_back(ranges.arena(), normalKeys);
}
printf("[INFO] runFastRestoreAgent: num_ranges:%d restore_range:%s\n", ranges.size(), range.toString().c_str());
printf("[INFO] runFastRestoreAgent: restore_ranges:%d first range:%s\n", ranges.size(),
ranges.front().toString().c_str());
if (performRestore) {
if (dbVersion == invalidVersion) {

View File

@ -505,8 +505,6 @@ Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Key getApplyKey( Version version, Key backupUid );
std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key);
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);
void decodeBackupLogValue(Arena& arena, VectorRef<MutationRef>& result, int64_t& mutationSize, StringRef value, StringRef addPrefix = StringRef(), StringRef removePrefix = StringRef());
Future<Void> logError(Database cx, Key keyErrors, const std::string& message);
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message);
Future<Void> checkVersion(Reference<ReadYourWritesTransaction> const& tr);

View File

@ -209,69 +209,6 @@ std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}
// value is an iterable representing all of the transaction log data for
// a given version.Returns an iterable(generator) yielding a tuple for
// each mutation in the log.At present, all mutations are represented as
// (type, param1, param2) where type is an integer and param1 and param2 are byte strings
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value) {
try {
uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001){
TraceEvent(SevError, "DecodeBackupLogValue").detail("IncompatibleProtocolVersion", protocolVersion)
.detail("ValueSize", value.size()).detail("Value", value);
throw incompatible_protocol_version();
}
Standalone<VectorRef<MutationRef>> result;
uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
if(totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
while (consumed < totalBytes){
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
MutationRef logValue;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
result.push_back_deep(result.arena(), logValue);
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
}
ASSERT(consumed == totalBytes);
if (value.size() != offset) {
TraceEvent(SevError, "BA_DecodeBackupLogValue").detail("UnexpectedExtraDataSize", value.size()).detail("Offset", offset).detail("TotalBytes", totalBytes).detail("Consumed", consumed).detail("OriginalOffset", originalOffset);
throw restore_corrupted_data();
}
return result;
}
catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_DecodeBackupLogValue").error(e).GetLastError().detail("ValueSize", value.size()).detail("Value", value);
throw;
}
}
void decodeBackupLogValue(Arena& arena, VectorRef<MutationRef>& result, int& mutationSize, StringRef value, StringRef addPrefix, StringRef removePrefix, Version version, Reference<KeyRangeMap<Version>> key_version) {
try {
uint64_t offset(0);

View File

@ -1373,7 +1373,8 @@ public:
wait(bc->readKeyspaceSnapshot(snapshot.get()));
restorable.ranges = std::move(results.first);
restorable.keyRanges = std::move(results.second);
if (g_network->isSimulated()) {
// TODO: Reenable the sanity check after TooManyFiles error is resolved
if (false && g_network->isSimulated()) {
// Sanity check key ranges
state std::map<std::string, KeyRange>::iterator rit;
for (rit = restorable.keyRanges.begin(); rit != restorable.keyRanges.end(); rit++) {
@ -1776,7 +1777,6 @@ public:
virtual ~BackupContainerBlobStore() {}
Future<Reference<IAsyncFile>> readFile(std::string path) final {
ASSERT(m_bstore->knobs.read_ahead_blocks > 0);
return Reference<IAsyncFile>(
new AsyncFileReadAheadCache(
Reference<IAsyncFile>(new AsyncFileBlobStoreRead(m_bstore, m_bucket, dataPath(path))),

View File

@ -98,19 +98,26 @@ struct MutationRef {
}
std::string toString() const {
if (type < MutationRef::MAX_ATOMIC_OP) {
return format("code: %s param1: %s param2: %s", typeString[type], printable(param1).c_str(), printable(param2).c_str());
}
else {
return format("code: Invalid param1: %s param2: %s", printable(param1).c_str(), printable(param2).c_str());
}
return format("code: %s param1: %s param2: %s",
type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "Unset", printable(param1).c_str(),
printable(param2).c_str());
}
bool isAtomicOp() const { return (ATOMIC_MASK & (1 << type)) != 0; }
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, type, param1, param2);
if (!ar.isDeserializing && type == ClearRange && equalsKeyAfter(param1, param2)) {
StringRef empty;
serializer(ar, type, param2, empty);
} else {
serializer(ar, type, param1, param2);
}
if (ar.isDeserializing && type == ClearRange && param2 == StringRef() && param1 != StringRef()) {
ASSERT(param1[param1.size()-1] == '\x00');
param2 = param1;
param1 = param2.substr(0, param2.size()-1);
}
}
// These masks define which mutation types have particular properties (they are used to implement isSingleKeyMutation() etc)
@ -129,6 +136,10 @@ static inline std::string getTypeString(MutationRef::Type type) {
return type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "Unset";
}
static inline std::string getTypeString(uint8_t type) {
return type < MutationRef::MAX_ATOMIC_OP ? typeString[type] : "Unset";
}
// A 'single key mutation' is one which affects exactly the value of the key specified by its param1
static inline bool isSingleKeyMutation(MutationRef::Type type) {
return (MutationRef::SINGLE_KEY_MASK & (1<<type)) != 0;

View File

@ -45,7 +45,7 @@ private:
};
typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo;
typedef MultiInterface<MasterProxyInterface> ProxyInfo;
typedef ModelInterface<MasterProxyInterface> ProxyInfo;
class ClientTagThrottleData : NonCopyable {
private:

View File

@ -282,7 +282,18 @@ struct KeyRangeRef {
template <class Ar>
force_inline void serialize(Ar& ar) {
serializer(ar, const_cast<KeyRef&>(begin), const_cast<KeyRef&>(end));
if (!ar.isDeserializing && equalsKeyAfter(begin, end)) {
StringRef empty;
serializer(ar, const_cast<KeyRef&>(end), empty);
} else {
serializer(ar, const_cast<KeyRef&>(begin), const_cast<KeyRef&>(end));
}
if (ar.isDeserializing && end == StringRef() && begin != StringRef()) {
ASSERT(begin[begin.size()-1] == '\x00');
const_cast<KeyRef&>(end) = begin;
const_cast<KeyRef&>(begin) = end.substr(0, end.size()-1);
}
if( begin > end ) {
TraceEvent("InvertedRange").detail("Begin", begin).detail("End", end);
throw inverted_range();

View File

@ -160,7 +160,7 @@ static inline int getBytes( CommitTransactionRequest const& r ) {
return total;
}
struct GetReadVersionReply {
struct GetReadVersionReply : public BasicLoadBalancedReply {
constexpr static FileIdentifier file_identifier = 15709388;
Version version;
bool locked;
@ -172,7 +172,7 @@ struct GetReadVersionReply {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, locked, metadataVersion, tagThrottleInfo);
serializer(ar, BasicLoadBalancedReply::recentRequests, version, locked, metadataVersion, tagThrottleInfo);
}
};

View File

@ -473,7 +473,7 @@ ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bo
choose {
when(wait(cx->onMasterProxiesChanged())) {}
when(GetHealthMetricsReply rep =
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getHealthMetrics,
wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::getHealthMetrics,
GetHealthMetricsRequest(sendDetailedRequest)))) {
cx->healthMetrics.update(rep.healthMetrics, detailed, true);
if (detailed) {
@ -700,7 +700,7 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ) );
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies ) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -710,7 +710,7 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::DATACENTER_ID:
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if( clientInfo->get().proxies.size() )
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies ));
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -1092,7 +1092,7 @@ Reference<ProxyInfo> DatabaseContext::getMasterProxies(bool useProvisionalProxie
masterProxiesLastChange = clientInfo->get().id;
masterProxies.clear();
if( clientInfo->get().proxies.size() ) {
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies, clientLocality ));
masterProxies = Reference<ProxyInfo>( new ProxyInfo( clientInfo->get().proxies ));
provisional = clientInfo->get().proxies[0].provisional;
}
}
@ -1239,7 +1239,7 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
when ( GetKeyServerLocationsReply rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
++cx->transactionKeyServerLocationRequestsCompleted;
if( info.debugID.present() )
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
@ -1278,7 +1278,7 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
++cx->transactionKeyServerLocationRequests;
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
when ( GetKeyServerLocationsReply _rep = wait( basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
++cx->transactionKeyServerLocationRequestsCompleted;
state GetKeyServerLocationsReply rep = _rep;
if( info.debugID.present() )
@ -1530,7 +1530,7 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
if (v.version >= version)
@ -1550,7 +1550,7 @@ ACTOR Future<Version> getRawVersion( Database cx ) {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, TransactionPriority::IMMEDIATE ), cx->taskID ) ) ) {
return v.version;
}
}
@ -2838,7 +2838,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
reply = proxies.size() ? throwErrorOr ( brokenPromiseToMaybeDelivered ( proxies[0].commit.tryGetReply(req) ) ) : Never();
}
} else {
reply = loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::commit, req, TaskPriority::DefaultPromiseEndpoint, true );
reply = basicLoadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::commit, req, TaskPriority::DefaultPromiseEndpoint, true );
}
choose {
@ -3248,7 +3248,7 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
state GetReadVersionRequest req( transactionCount, priority, flags, tags, debugID );
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
when ( GetReadVersionReply v = wait( basicLoadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
auto &priorityThrottledTags = cx->throttledTags[priority];
for(auto& tag : tags) {
auto itr = v.tagThrottleInfo.find(tag.first);
@ -3792,7 +3792,7 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
loop {
choose {
when(wait(cx->onMasterProxiesChanged())) {}
when(wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, ProxySnapRequest(snapCmd, snapUID, snapUID), cx->taskID, true /*atmostOnce*/ ))) {
when(wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::proxySnapReq, ProxySnapRequest(snapCmd, snapUID, snapUID), cx->taskID, true /*atmostOnce*/ ))) {
TraceEvent("SnapCreateExit")
.detail("SnapCmd", snapCmd.toString())
.detail("UID", snapUID);
@ -3820,7 +3820,7 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exc
choose {
when(wait(cx->onMasterProxiesChanged())) {}
when(ExclusionSafetyCheckReply _ddCheck =
wait(loadBalance(cx->getMasterProxies(false), &MasterProxyInterface::exclusionSafetyCheckReq,
wait(basicLoadBalance(cx->getMasterProxies(false), &MasterProxyInterface::exclusionSafetyCheckReq,
req, cx->taskID))) {
ddCheck = _ddCheck.safe;
break;

View File

@ -1590,6 +1590,7 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
}
if(operationType == MutationRef::SetVersionstampedKey) {
TEST(options.readYourWritesDisabled); // SetVersionstampedKey without ryw enabled
// this does validation of the key and needs to be performed before the readYourWritesDisabled path
KeyRangeRef range = getVersionstampKeyRange(arena, k, tr.getCachedReadVersion().orDefault(0), getMaxReadKey());
if(!options.readYourWritesDisabled) {

View File

@ -466,29 +466,26 @@ struct RestoreSendVersionedMutationsRequest : TimedRequest {
Version msgIndex; // Monitonically increasing index of mutation messages
bool isRangeFile;
MutationsVec mutations; // Mutations that may be at different versions parsed by one loader
LogMessageVersionVec mVersions; // (version, subversion) of each mutation in mutations field
VersionedMutationsVec versionedMutations; // Versioned mutations may be at different versions parsed by one loader
ReplyPromise<RestoreCommonReply> reply;
RestoreSendVersionedMutationsRequest() = default;
explicit RestoreSendVersionedMutationsRequest(int batchIndex, const RestoreAsset& asset, Version msgIndex,
bool isRangeFile, MutationsVec mutations,
LogMessageVersionVec mVersions)
: batchIndex(batchIndex), asset(asset), msgIndex(msgIndex), isRangeFile(isRangeFile), mutations(mutations),
mVersions(mVersions) {}
bool isRangeFile, VersionedMutationsVec versionedMutations)
: batchIndex(batchIndex), asset(asset), msgIndex(msgIndex), isRangeFile(isRangeFile),
versionedMutations(versionedMutations) {}
std::string toString() {
std::stringstream ss;
ss << "VersionBatchIndex:" << batchIndex << "RestoreAsset:" << asset.toString() << " msgIndex:" << msgIndex
<< " isRangeFile:" << isRangeFile << " mutations.size:" << mutations.size()
<< " mVersions.size:" << mVersions.size();
<< " isRangeFile:" << isRangeFile << " versionedMutations.size:" << versionedMutations.size();
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, batchIndex, asset, msgIndex, isRangeFile, mutations, mVersions, reply);
serializer(ar, batchIndex, asset, msgIndex, isRangeFile, versionedMutations, reply);
}
};

View File

@ -1,248 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="14.1" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(SolutionDir)versions.target" />
<PropertyGroup Condition="'$(Release)' != 'true' ">
<PreReleaseDecoration>-PRERELEASE</PreReleaseDecoration>
</PropertyGroup>
<PropertyGroup Condition="'$(Release)' == 'true' ">
<PreReleaseDecoration>
</PreReleaseDecoration>
<PreprocessorDefinitions>FDB_CLEAN_BUILD;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</PropertyGroup>
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|X64">
<Configuration>Debug</Configuration>
<Platform>X64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|X64">
<Configuration>Release</Configuration>
<Platform>X64</Platform>
</ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="AsyncFileBlobStore.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="Atomic.h" />
<ClInclude Include="BackupContainer.h" />
<ActorCompiler Include="BackupAgent.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="BlobStore.h" />
<ClInclude Include="ClientLogEvents.h" />
<ClInclude Include="ClientWorkerInterface.h" />
<ClInclude Include="ClusterInterface.h" />
<ClInclude Include="CommitTransaction.h" />
<ClInclude Include="CoordinationInterface.h" />
<ClInclude Include="DatabaseConfiguration.h" />
<ClInclude Include="DatabaseContext.h" />
<ActorCompiler Include="EventTypes.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="FDBOptions.g.h" />
<ClInclude Include="FDBOptions.h" />
<ClInclude Include="FDBTypes.h" />
<ClInclude Include="HTTP.h" />
<ClInclude Include="KeyBackedTypes.h" />
<ClInclude Include="MetricLogger.h" />
<ClInclude Include="IClientApi.h" />
<ClInclude Include="JsonBuilder.h" />
<ClInclude Include="JSONDoc.h" />
<ClInclude Include="json_spirit\json_spirit_error_position.h" />
<ClInclude Include="json_spirit\json_spirit_reader_template.h" />
<ClInclude Include="json_spirit\json_spirit_value.h" />
<ClInclude Include="json_spirit\json_spirit_writer_options.h" />
<ClInclude Include="json_spirit\json_spirit_writer_template.h" />
<ClInclude Include="KeyRangeMap.h" />
<ClInclude Include="Knobs.h" />
<ClInclude Include="libb64\cdecode.h" />
<ClInclude Include="libb64\cencode.h" />
<ClInclude Include="libb64\decode.h" />
<ClInclude Include="libb64\encode.h" />
<ActorCompiler Include="ManagementAPI.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="MasterProxyInterface.h" />
<ClInclude Include="md5\md5.h" />
<ClInclude Include="MonitorLeader.h" />
<ClInclude Include="MultiVersionAssignmentVars.h" />
<ClInclude Include="MultiVersionTransaction.h" />
<ClInclude Include="MutationList.h" />
<ActorCompiler Include="NativeAPI.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="Notified.h" />
<ClInclude Include="ReadYourWrites.h" />
<ActorCompiler Include="RunTransaction.actor.h" />
<ClInclude Include="RYWIterator.h" />
<ClInclude Include="Schemas.h" />
<ClInclude Include="sha1\SHA1.h" />
<ClInclude Include="SnapshotCache.h" />
<ActorCompiler Include="SpecialKeySpace.actor.h" />
<ClInclude Include="Status.h" />
<ClInclude Include="StatusClient.h" />
<ClInclude Include="StorageServerInterface.h" />
<ClInclude Include="Subspace.h" />
<ClInclude Include="SystemData.h" />
<ActorCompiler Include="RestoreWorkerInterface.actor.h">
<EnableCompile>false</EnableCompile>
</ActorCompiler>
<ClInclude Include="TaskBucket.h" />
<ClInclude Include="ThreadSafeTransaction.h" />
<ClInclude Include="Tuple.h" />
<ActorCompiler Include="VersionedMap.actor.h">
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">false</EnableCompile>
<EnableCompile Condition="'$(Configuration)|$(Platform)'=='Release|X64'">false</EnableCompile>
</ActorCompiler>
<ClInclude Include="VersionedMap.h" />
<ClInclude Include="WriteMap.h" />
<ClInclude Include="zipf.h" />
</ItemGroup>
<ItemGroup>
<ActorCompiler Include="AsyncFileBlobStore.actor.cpp" />
<ClCompile Include="AutoPublicAddress.cpp" />
<ActorCompiler Include="BackupAgentBase.actor.cpp" />
<ActorCompiler Include="BackupContainer.actor.cpp" />
<ActorCompiler Include="BlobStore.actor.cpp" />
<ActorCompiler Include="DatabaseBackupAgent.actor.cpp" />
<ClCompile Include="DatabaseConfiguration.cpp" />
<ClCompile Include="FDBOptions.g.cpp" />
<ActorCompiler Include="FileBackupAgent.actor.cpp" />
<ActorCompiler Include="HTTP.actor.cpp" />
<ActorCompiler Include="KeyRangeMap.actor.cpp" />
<ClCompile Include="Knobs.cpp" />
<ClCompile Include="libb64\cdecode.c" />
<ClCompile Include="libb64\cencode.c" />
<ClCompile Include="md5\md5.c" />
<ActorCompiler Include="MetricLogger.actor.cpp" />
<ActorCompiler Include="MonitorLeader.actor.cpp" />
<ActorCompiler Include="ManagementAPI.actor.cpp" />
<ActorCompiler Include="MultiVersionTransaction.actor.cpp" />
<ActorCompiler Include="NativeAPI.actor.cpp" />
<ActorCompiler Include="ReadYourWrites.actor.cpp" />
<ClCompile Include="RYWIterator.cpp" />
<ActorCompiler Include="StatusClient.actor.cpp" />
<ClCompile Include="Schemas.cpp" />
<ClCompile Include="SystemData.cpp" />
<ClCompile Include="sha1\SHA1.cpp" />
<ActorCompiler Include="SpecialKeySpace.actor.cpp" />
<ActorCompiler Include="ThreadSafeTransaction.actor.cpp" />
<ActorCompiler Include="TaskBucket.actor.cpp" />
<ClCompile Include="Subspace.cpp" />
<ClCompile Include="Tuple.cpp" />
<ClCompile Include="JsonBuilder.cpp" />
<ClCompile Include="zipf.c" />
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGUID>{E2939DAA-238E-4970-96C4-4C57980F93BD}</ProjectGUID>
<TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
<Keyword>Win32Proj</Keyword>
<RootNamespace>flow</RootNamespace>
</PropertyGroup>
<PropertyGroup>
<OutDir>$(SolutionDir)bin\$(Configuration)\</OutDir>
<IntDir>$(SystemDrive)\temp\msvcfdb\$(Platform)$(Configuration)\$(MSBuildProjectName)\</IntDir>
<BuildLogFile>$(IntDir)\$(MSBuildProjectName).log</BuildLogFile>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|X64'" Label="Configuration">
<ConfigurationType>StaticLibrary</ConfigurationType>
<CharacterSet>MultiByte</CharacterSet>
<PlatformToolset>v141</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|X64'" Label="Configuration">
<ConfigurationType>StaticLibrary</ConfigurationType>
<CharacterSet>MultiByte</CharacterSet>
<PlatformToolset>v141</PlatformToolset>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="PropertySheets">
<Import Project="$(LocalAppData)\Microsoft\VisualStudio\10.0\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(LocalAppData)\Microsoft\VisualStudio\10.0\Microsoft.Cpp.$(Platform).user.props')" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(IncludePath);../;C:\Program Files\boost_1_67_0</IncludePath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|X64'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(IncludePath);../;C:\Program Files\boost_1_67_0</IncludePath>
</PropertyGroup>
<ItemDefinitionGroup>
<ClCompile>
<PreprocessorDefinitions>FDB_VT_VERSION="$(Version)$(PreReleaseDecoration)";FDB_VT_PACKAGE_NAME="$(PackageName)";%(PreprocessorDefinitions)</PreprocessorDefinitions>
<LanguageStandard>stdcpp17</LanguageStandard>
</ClCompile>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|X64'">
<ClCompile>
<PrecompiledHeader>
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<MinimalRebuild>false</MinimalRebuild>
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
<Optimization>Disabled</Optimization>
<BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
<PreprocessorDefinitions>TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;_DEBUG;_HAS_ITERATOR_DEBUGGING=0;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<AdditionalOptions>/bigobj @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
<LanguageStandard>stdcpp17</LanguageStandard>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<AdditionalDependencies>Advapi32.lib</AdditionalDependencies>
</Link>
<Lib>
<AdditionalDependencies>$(TargetDir)flow.lib;$(TargetDir)fdbrpc.lib;winmm.lib</AdditionalDependencies>
</Lib>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|X64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>
</PrecompiledHeader>
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
<Optimization>Full</Optimization>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<EnableEnhancedInstructionSet>NotSet</EnableEnhancedInstructionSet>
<EnablePREfast>false</EnablePREfast>
<AdditionalOptions>/bigobj @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<MinimalRebuild>false</MinimalRebuild>
<LanguageStandard>stdcpp17</LanguageStandard>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>false</EnableCOMDATFolding>
<OptimizeReferences>false</OptimizeReferences>
<LinkTimeCodeGeneration>Default</LinkTimeCodeGeneration>
<AdditionalDependencies>Advapi32.lib</AdditionalDependencies>
<AdditionalOptions>/LTCG %(AdditionalOptions)</AdditionalOptions>
</Link>
<Lib>
<AdditionalDependencies>$(TargetDir)flow.lib;$(TargetDir)fdbrpc.lib;winmm.lib</AdditionalDependencies>
</Lib>
</ItemDefinitionGroup>
<ImportGroup Label="ExtensionTargets">
<Import Project="..\flow\actorcompiler\ActorCompiler.targets" />
</ImportGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<Target Name="MyPreCompileSteps" AfterTargets="CLCompile">
<Exec Command="&quot;$(SolutionDir)bin\$(Configuration)\coveragetool.exe&quot; &quot;$(OutDir)coverage.$(TargetName).xml&quot; @(ActorCompiler -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLInclude -> '%(RelativeDir)%(Filename)%(Extension)', ' ') @(CLCompile -> '%(RelativeDir)%(Filename)%(Extension)', ' ')" />
</Target>
</Project>

View File

@ -37,6 +37,11 @@
#include <linux/limits.h>
#endif
#ifdef __FreeBSD__
#include <sys/event.h>
#define O_EVTONLY O_RDONLY
#endif
#ifdef __APPLE__
#include <sys/event.h>
#include <mach/mach.h>
@ -78,7 +83,7 @@
#ifdef __linux__
typedef fd_set* fdb_fd_set;
#elif defined __APPLE__
#elif defined(__APPLE__) || defined(__FreeBSD__)
typedef int fdb_fd_set;
#endif
@ -89,7 +94,7 @@ void monitor_fd( fdb_fd_set list, int fd, int* maxfd, void* cmd ) {
FD_SET( fd, list );
if ( fd > *maxfd )
*maxfd = fd;
#elif defined __APPLE__
#elif defined(__APPLE__) || defined(__FreeBSD__)
/* ignore maxfd */
struct kevent ev;
EV_SET( &ev, fd, EVFILT_READ, EV_ADD, 0, 0, cmd );
@ -100,7 +105,7 @@ void monitor_fd( fdb_fd_set list, int fd, int* maxfd, void* cmd ) {
void unmonitor_fd( fdb_fd_set list, int fd ) {
#ifdef __linux__
FD_CLR( fd, list );
#elif defined __APPLE__
#elif defined(__APPLE__) || defined(__FreeBSD__)
struct kevent ev;
EV_SET( &ev, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL );
kevent( list, &ev, 1, NULL, 0, NULL ); // FIXME: check?
@ -194,7 +199,7 @@ const char* get_value_multi(const CSimpleIni& ini, const char* key, ...) {
}
double timer() {
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return double(ts.tv_sec) + (ts.tv_nsec * 1e-9);
@ -913,7 +918,7 @@ void read_child_output( Command* cmd, int pipe_idx, fdb_fd_set fds ) {
}
}
#ifdef __APPLE__
#if defined(__APPLE__) || defined(__FreeBSD__)
void watch_conf_dir( int kq, int* confd_fd, std::string confdir ) {
struct kevent ev;
std::string original = confdir;
@ -1171,7 +1176,11 @@ int main(int argc, char** argv) {
// testPathOps(); return -1;
std::string lockfile = "/var/run/fdbmonitor.pid";
#ifdef __FreeBSD__
std::string _confpath = "/usr/local/etc/foundationdb/foundationdb.conf";
#else
std::string _confpath = "/etc/foundationdb/foundationdb.conf";
#endif
std::vector<const char *> additional_watch_paths;
@ -1266,12 +1275,12 @@ int main(int argc, char** argv) {
#endif
if (daemonize) {
#ifdef __APPLE__
#if defined(__APPLE__) || defined(__FreeBSD__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
if (daemon(0, 0)) {
#ifdef __APPLE__
#if defined(__APPLE__) || defined(__FreeBSD__)
#pragma GCC diagnostic pop
#endif
log_err("daemon", errno, "Unable to daemonize");
@ -1330,7 +1339,7 @@ int main(int argc, char** argv) {
signal(SIGHUP, signal_handler);
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
#elif defined(__APPLE__)
#elif defined(__APPLE__) || defined(__FreeBSD__)
int kq = kqueue();
if ( kq < 0 ) {
log_err( "kqueue", errno, "Unable to create kqueue" );
@ -1375,11 +1384,11 @@ int main(int argc, char** argv) {
/* normal will be restored in our main loop in the call to
pselect, but none blocks all signals while processing events */
sigprocmask(SIG_SETMASK, &full_mask, &normal_mask);
#elif defined(__APPLE__)
#elif defined(__APPLE__) || defined(__FreeBSD__)
sigprocmask(0, NULL, &normal_mask);
#endif
#ifdef __APPLE__
#if defined(__APPLE__) || defined(__FreeBSD__)
struct stat st_buf;
struct timespec mtimespec;
@ -1438,7 +1447,7 @@ int main(int argc, char** argv) {
load_conf(confpath.c_str(), uid, gid, &normal_mask, &rfds, &maxfd);
reload_additional_watches = false;
#elif defined(__APPLE__)
#elif defined(__APPLE__) || defined(__FreeBSD__)
load_conf( confpath.c_str(), uid, gid, &normal_mask, watched_fds, &maxfd );
watch_conf_file( kq, &conff_fd, confpath.c_str() );
watch_conf_dir( kq, &confd_fd, confdir );
@ -1476,7 +1485,7 @@ int main(int argc, char** argv) {
if(nfds == 0) {
reload = true;
}
#elif defined(__APPLE__)
#elif defined(__APPLE__) || defined(__FreeBSD__)
int nev = 0;
if(timeout < 0) {
nev = kevent( kq, NULL, 0, &ev, 1, NULL );

View File

@ -49,7 +49,16 @@ if(APPLE)
list(APPEND FDBRPC_THIRD_PARTY_SRCS libcoroutine/asm.S)
endif()
if(NOT WIN32)
list(APPEND FDBRPC_THIRD_PARTY_SRCS libcoroutine/context.c libeio/eio.c)
if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
find_library(EIO eio)
if(EIO)
list(APPEND FDBRPC_THIRD_PARTY_SRCS libcoroutine/context.c)
else()
list(APPEND FDBRPC_THIRD_PARTY_SRCS libcoroutine/context.c libeio/eio.c)
endif()
else()
list(APPEND FDBRPC_THIRD_PARTY_SRCS libcoroutine/context.c libeio/eio.c)
endif()
endif()
add_library(thirdparty STATIC ${FDBRPC_THIRD_PARTY_SRCS})

View File

@ -73,8 +73,8 @@ struct LoadBalancedReply {
LoadBalancedReply() : penalty(1.0) {}
};
Optional<LoadBalancedReply> getLoadBalancedReply(LoadBalancedReply *reply);
Optional<LoadBalancedReply> getLoadBalancedReply(void*);
Optional<LoadBalancedReply> getLoadBalancedReply(const LoadBalancedReply *reply);
Optional<LoadBalancedReply> getLoadBalancedReply(const void*);
// Returns true if we got a value for our request
// Throws an error if the request returned an error that should bubble out
@ -455,6 +455,103 @@ Future< REPLY_TYPE(Request) > loadBalance(
}
}
// Subclasses must initialize all members in their default constructors
// Subclasses must serialize all members
struct BasicLoadBalancedReply {
int recentRequests;
BasicLoadBalancedReply() : recentRequests(0) {}
};
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const BasicLoadBalancedReply *reply);
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const void*);
// A simpler version of LoadBalance that does not send second requests where the list of servers are always fresh
ACTOR template <class Interface, class Request, class Multi>
Future< REPLY_TYPE(Request) > basicLoadBalance(
Reference<ModelInterface<Multi>> alternatives,
RequestStream<Request> Interface::* channel,
Request request = Request(),
TaskPriority taskID = TaskPriority::DefaultPromiseEndpoint,
bool atMostOnce = false)
{
setReplyPriority(request, taskID);
if (!alternatives)
return Never();
ASSERT( alternatives->size() && alternatives->alwaysFresh() );
state int bestAlt = alternatives->getBest();
state int nextAlt = deterministicRandom()->randomInt(0, std::max(alternatives->size() - 1,1));
if( nextAlt >= bestAlt )
nextAlt++;
state int startAlt = nextAlt;
state int startDistance = (bestAlt+alternatives->size()-startAlt) % alternatives->size();
state int numAttempts = 0;
state double backoff = 0;
state int useAlt;
loop {
// Find an alternative, if any, that is not failed, starting with nextAlt
state RequestStream<Request> const* stream = NULL;
for(int alternativeNum=0; alternativeNum<alternatives->size(); alternativeNum++) {
useAlt = nextAlt;
if( nextAlt == startAlt )
useAlt = bestAlt;
else if( (nextAlt+alternatives->size()-startAlt) % alternatives->size() <= startDistance )
useAlt = (nextAlt+alternatives->size()-1) % alternatives->size();
stream = &alternatives->get( useAlt, channel );
if (!IFailureMonitor::failureMonitor().getState( stream->getEndpoint() ).failed)
break;
nextAlt = (nextAlt+1) % alternatives->size();
stream=NULL;
}
if(!stream) {
// Everything is down! Wait for someone to be up.
vector<Future<Void>> ok( alternatives->size() );
for(int i=0; i<ok.size(); i++) {
ok[i] = IFailureMonitor::failureMonitor().onStateEqual( alternatives->get(i, channel).getEndpoint(), FailureStatus(false) );
}
wait( quorum( ok, 1 ) );
numAttempts = 0; // now that we've got a server back, reset the backoff
} else {
if(backoff > 0.0) {
wait(delay(backoff));
}
ErrorOr<REPLY_TYPE(Request)> result = wait(stream->tryGetReply(request));
if(result.present()) {
Optional<BasicLoadBalancedReply> loadBalancedReply = getBasicLoadBalancedReply(&result.get());
if(loadBalancedReply.present()) {
alternatives->updateRecent( useAlt, loadBalancedReply.get().recentRequests );
}
return result.get();
}
if(result.getError().code() != error_code_broken_promise && result.getError().code() != error_code_request_maybe_delivered) {
throw result.getError();
}
if(atMostOnce) {
throw request_maybe_delivered();
}
if(++numAttempts >= alternatives->size()) {
backoff = std::min(FLOW_KNOBS->LOAD_BALANCE_MAX_BACKOFF, std::max(FLOW_KNOBS->LOAD_BALANCE_START_BACKOFF, backoff * FLOW_KNOBS->LOAD_BALANCE_BACKOFF_RATE));
}
}
nextAlt = (nextAlt+1) % alternatives->size();
resetReply(request, taskID);
}
}
#include "flow/unactorcompiler.h"
#endif

View File

@ -58,50 +58,111 @@ struct ReferencedInterface : public ReferenceCounted<ReferencedInterface<T>> {
};
template <class T>
class MultiInterface : public ReferenceCounted<MultiInterface<T>> {
struct AlternativeInfo {
T interf;
double probability;
double cumulativeProbability;
int recentRequests;
double lastUpdate;
AlternativeInfo(T const& interf, double probability, double cumulativeProbability) : interf(interf), probability(probability), cumulativeProbability(cumulativeProbability), recentRequests(-1), lastUpdate(0) {}
bool operator < (double const& r) const {
return cumulativeProbability < r;
}
bool operator <= (double const& r) const {
return cumulativeProbability <= r;
}
bool operator == (double const& r) const {
return cumulativeProbability == r;
}
};
template <class T>
class ModelInterface : public ReferenceCounted<ModelInterface<T>> {
public:
MultiInterface( const vector<T>& v, LocalityData const& locality = LocalityData() ) : bestCount(0) {
for(int i=0; i<v.size(); i++)
alternatives.push_back(KVPair<int,T>(LBDistance::DISTANT,v[i]));
deterministicRandom()->randomShuffle(alternatives);
if ( LBLocalityData<T>::Present ) {
for(int a=0; a<alternatives.size(); a++)
alternatives[a].k = loadBalanceDistance( locality, LBLocalityData<T>::getLocality( alternatives[a].v ), LBLocalityData<T>::getAddress( alternatives[a].v ) );
std::stable_sort( alternatives.begin(), alternatives.end() );
ModelInterface( const vector<T>& v ) {
for(int i = 0; i < v.size(); i++) {
alternatives.push_back(AlternativeInfo(v[i], 1.0/v.size(), (i+1.0)/v.size()));
}
if(v.size()) {
updater = recurring([this](){ updateProbabilities(); }, FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE);
}
if(size())
bestCount = std::lower_bound( alternatives.begin()+1, alternatives.end(), alternatives[0].k+1 ) - alternatives.begin();
}
int size() const { return alternatives.size(); }
int countBest() const {
return bestCount;
}
LBDistance::Type bestDistance() const {
if( !size() )
return LBDistance::DISTANT;
return (LBDistance::Type) alternatives[0].k;
}
bool alwaysFresh() const {
return LBLocalityData<T>::alwaysFresh();
}
template <class F>
F const& get( int index, F T::*member ) const {
return alternatives[index].v.*member;
int getBest() const {
return std::lower_bound( alternatives.begin(), alternatives.end(), deterministicRandom()->random01() ) - alternatives.begin();
}
T const& getInterface(int index) { return alternatives[index].v; }
UID getId( int index ) const { return alternatives[index].v.id(); }
void updateRecent( int index, int recentRequests ) {
alternatives[index].recentRequests = recentRequests;
alternatives[index].lastUpdate = now();
}
virtual ~MultiInterface() {}
void updateProbabilities() {
double totalRequests = 0;
for(auto& it : alternatives) {
totalRequests += it.recentRequests;
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
return;
}
}
if(totalRequests < 1000) {
return;
}
double totalProbability = 0;
for(auto& it : alternatives) {
it.probability += (1.0/alternatives.size()-(it.recentRequests/totalRequests))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
totalProbability += it.probability;
}
for(auto& it : alternatives) {
it.probability = it.probability/totalProbability;
}
totalProbability = 0;
for(auto& it : alternatives) {
totalProbability += it.probability;
it.cumulativeProbability = totalProbability;
}
alternatives.back().cumulativeProbability = 1.0;
}
template <class F>
F const& get( int index, F T::*member ) const {
return alternatives[index].interf.*member;
}
T const& getInterface(int index) { return alternatives[index].interf; }
UID getId( int index ) const { return alternatives[index].interf.id(); }
virtual ~ModelInterface() {}
std::string description() {
return describe( alternatives );
}
private:
vector<KVPair<int,T>> alternatives;
int16_t bestCount;
vector<AlternativeInfo<T>> alternatives;
Future<Void> updater;
};
template <class T>
class MultiInterface : public ReferenceCounted<MultiInterface<T>> {
MultiInterface( const vector<T>& v, LocalityData const& locality = LocalityData() ) {
//This version of MultInterface is no longer used, but was kept around because of templating
ASSERT(false);
}
virtual ~MultiInterface() {}
};
template <class T>
@ -155,5 +216,6 @@ private:
};
template <class Ar, class T> void load(Ar& ar, Reference<MultiInterface<T>>&) { ASSERT(false); } //< required for Future<T>
template <class Ar, class T> void load(Ar& ar, Reference<ModelInterface<T>>&) { ASSERT(false); } //< required for Future<T>
#endif

View File

@ -57,14 +57,22 @@ double QueueModel::addRequest( uint64_t id ) {
return d.penalty;
}
Optional<LoadBalancedReply> getLoadBalancedReply(LoadBalancedReply *reply) {
Optional<LoadBalancedReply> getLoadBalancedReply(const LoadBalancedReply *reply) {
return *reply;
}
Optional<LoadBalancedReply> getLoadBalancedReply(void*) {
Optional<LoadBalancedReply> getLoadBalancedReply(const void*) {
return Optional<LoadBalancedReply>();
}
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const BasicLoadBalancedReply *reply) {
return *reply;
}
Optional<BasicLoadBalancedReply> getBasicLoadBalancedReply(const void*) {
return Optional<BasicLoadBalancedReply>();
}
/*
void QueueModel::addMeasurement( uint64_t id, QueueDetails qd ){
if (data[new_index].count(id))

142
fdbrpc/libeio/config.h.FreeBSD Executable file
View File

@ -0,0 +1,142 @@
/* config.h. Generated from config.h.in by configure. */
/* config.h.in. Generated from configure.ac by autoheader. */
/* Define to 1 if you have the <dlfcn.h> header file. */
#define HAVE_DLFCN_H 1
/* fdatasync(2) is available */
#define HAVE_FDATASYNC 1
/* futimes(2) is available */
#define HAVE_FUTIMES 1
/* Define to 1 if you have the <inttypes.h> header file. */
#define HAVE_INTTYPES_H 1
/* fallocate(2) is available */
/* #undef HAVE_LINUX_FALLOCATE */
/* Define to 1 if you have the <linux/fiemap.h> header file. */
/* #undef HAVE_LINUX_FIEMAP_H */
/* Define to 1 if you have the <linux/fs.h> header file. */
/* #undef HAVE_LINUX_FS_H */
/* splice/vmsplice/tee(2) are available */
/* #undef HAVE_LINUX_SPLICE */
/* Define to 1 if you have the <memory.h> header file. */
#define HAVE_MEMORY_H 1
/* posix_fadvise(2) is available */
#define HAVE_POSIX_FADVISE 1
/* posix_madvise(2) is available */
#define HAVE_POSIX_MADVISE 1
/* prctl(PR_SET_NAME) is available */
/* #undef HAVE_PRCTL_SET_NAME */
/* readahead(2) is available (linux) */
/* #undef HAVE_READAHEAD */
/* sendfile(2) is available and supported */
#define HAVE_SENDFILE 1
/* Define to 1 if you have the <stdint.h> header file. */
#define HAVE_STDINT_H 1
/* Define to 1 if you have the <stdlib.h> header file. */
#define HAVE_STDLIB_H 1
/* Define to 1 if you have the <strings.h> header file. */
#define HAVE_STRINGS_H 1
/* Define to 1 if you have the <string.h> header file. */
#define HAVE_STRING_H 1
/* sync_file_range(2) is available */
/* #undef HAVE_SYNC_FILE_RANGE */
/* Define to 1 if you have the <sys/prctl.h> header file. */
/* #undef HAVE_SYS_PRCTL_H */
/* Define to 1 if you have the <sys/stat.h> header file. */
#define HAVE_SYS_STAT_H 1
/* syscall(__NR_syncfs) is available */
/* #undef HAVE_SYS_SYNCFS */
/* Define to 1 if you have the <sys/syscall.h> header file. */
#define HAVE_SYS_SYSCALL_H 1
/* Define to 1 if you have the <sys/types.h> header file. */
#define HAVE_SYS_TYPES_H 1
/* Define to 1 if you have the <unistd.h> header file. */
#define HAVE_UNISTD_H 1
/* utimes(2) is available */
#define HAVE_UTIMES 1
/* Define to the sub-directory where libtool stores uninstalled libraries. */
#define LT_OBJDIR ".libs/"
/* Name of package */
#define PACKAGE "libeio"
/* Define to the address where bug reports for this package should be sent. */
#define PACKAGE_BUGREPORT ""
/* Define to the full name of this package. */
#define PACKAGE_NAME ""
/* Define to the full name and version of this package. */
#define PACKAGE_STRING ""
/* Define to the one symbol short name of this package. */
#define PACKAGE_TARNAME ""
/* Define to the home page for this package. */
#define PACKAGE_URL ""
/* Define to the version of this package. */
#define PACKAGE_VERSION ""
/* Define to 1 if you have the ANSI C header files. */
#define STDC_HEADERS 1
/* Enable extensions on AIX 3, Interix. */
#ifndef _ALL_SOURCE
# define _ALL_SOURCE 1
#endif
/* Enable GNU extensions on systems that have them. */
#ifndef _GNU_SOURCE
# define _GNU_SOURCE 1
#endif
/* Enable threading extensions on Solaris. */
#ifndef _POSIX_PTHREAD_SEMANTICS
# define _POSIX_PTHREAD_SEMANTICS 1
#endif
/* Enable extensions on HP NonStop. */
#ifndef _TANDEM_SOURCE
# define _TANDEM_SOURCE 1
#endif
/* Enable general extensions on Solaris. */
#ifndef __EXTENSIONS__
# define __EXTENSIONS__ 1
#endif
/* Version number of package */
#define VERSION "1.0"
/* Define to 1 if on MINIX. */
/* #undef _MINIX */
/* Define to 2 if the system does not provide POSIX.1 features except with
this defined. */
/* #undef _POSIX_1_SOURCE */
/* Define to 1 if you need to in order for `stat' and other things to work. */
/* #undef _POSIX_SOURCE */

View File

@ -39,6 +39,8 @@
#ifdef __linux__
#include "config.h.linux"
#elif defined(__FreeBSD__)
#include "config.h.FreeBSD"
#elif defined(__APPLE__)
#include "config.h.osx"
#endif

View File

@ -395,7 +395,7 @@ struct BackupData {
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onMasterProxiesChanged())) {}
when(GetReadVersionReply reply = wait(loadBalance(self->cx->getMasterProxies(false),
when(GetReadVersionReply reply = wait(basicLoadBalance(self->cx->getMasterProxies(false),
&MasterProxyInterface::getConsistentReadVersion,
request, self->cx->taskID))) {
return reply.version;

View File

@ -4414,12 +4414,12 @@ ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db,
loop {
wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) );
state Reference<ProxyInfo> proxies(new ProxyInfo(db->get().client.proxies, db->get().myLocality));
state Reference<ProxyInfo> proxies(new ProxyInfo(db->get().client.proxies));
choose {
when (wait(db->onChange())) {}
when (GetHealthMetricsReply reply = wait(proxies->size() ?
loadBalance(proxies, &MasterProxyInterface::getHealthMetrics, GetHealthMetricsRequest(false))
basicLoadBalance(proxies, &MasterProxyInterface::getHealthMetrics, GetHealthMetricsRequest(false))
: Never())) {
if (reply.healthMetrics.batchLimited) {
*lastLimited = now();

View File

@ -587,7 +587,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// Fast Restore
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
init( FASTRESTORE_SAMPLING_PERCENT, 1 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_SAMPLING_PERCENT, 80 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 512.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
@ -595,13 +595,13 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_VB_PARALLELISM, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS = deterministicRandom()->random01() * 240 + 10; }
init( FASTRESTORE_TRACK_REQUEST_LATENCY, true ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_REQUEST_LATENCY = false; }
init( FASTRESTORE_TRACK_REQUEST_LATENCY, false ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_REQUEST_LATENCY = false; }
init( FASTRESTORE_TRACK_LOADER_SEND_REQUESTS, false ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_LOADER_SEND_REQUESTS = true; }
init( FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT, 6144 ); if( randomize && BUGGIFY ) { FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT = 1; }
init( FASTRESTORE_WAIT_FOR_MEMORY_LATENCY, 10 ); if( randomize && BUGGIFY ) { FASTRESTORE_WAIT_FOR_MEMORY_LATENCY = 60; }
@ -610,6 +610,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FASTRESTORE_APPLIER_FETCH_KEYS_SIZE, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLIER_FETCH_KEYS_SIZE = deterministicRandom()->random01() * 10240 + 1; }
init( FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES, 1.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 + 1; }
init( FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE, false ); if( randomize && BUGGIFY ) { FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE = deterministicRandom()->random01() < 0.5 ? true : false; }
init( FASTRESTORE_REQBATCH_PARALLEL, 50 ); if( randomize && BUGGIFY ) { FASTRESTORE_REQBATCH_PARALLEL = deterministicRandom()->random01() * 100 + 1; }
// clang-format on

View File

@ -549,6 +549,7 @@ public:
int64_t FASTRESTORE_APPLIER_FETCH_KEYS_SIZE; // number of keys to fetch in a txn on applier
int64_t FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES; // desired size of mutation message sent from loader to appliers
bool FASTRESTORE_GET_RANGE_VERSIONS_EXPENSIVE; // parse each range file to get (range, version) it has?
int64_t FASTRESTORE_REQBATCH_PARALLEL; // number of requests to wait on for getBatchReplies()
ServerKnobs();
void initialize(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);

View File

@ -93,8 +93,34 @@ struct ProxyStats {
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while(now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest() {
updateRequestBuckets();
++recentRequests;
++requestBuckets.back();
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests*FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion, int64_t *commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()), bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
@ -117,6 +143,9 @@ struct ProxyStats {
specialCounter(cc, "CommittedVersion", [pCommittedVersion](){ return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount", [commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};
@ -269,6 +298,7 @@ ACTOR Future<Void> queueTransactionStartRequests(
loop choose{
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
stats->addRequest();
if( stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ) {
++stats->txnRequestErrors;
//FIXME: send an error instead of giving an unreadable version when the client can support the error: req.reply.sendError(proxy_memory_limit_exceeded());
@ -568,6 +598,7 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
choose{
when(CommitTransactionRequest req = waitNext(in)) {
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
commitData->stats.addRequest();
int bytes = getBytes(req);
// Drop requests if memory is under severe pressure
@ -1361,6 +1392,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
rep.version = commitData->committedVersion.get();
rep.locked = commitData->locked;
rep.metadataVersion = commitData->metadataVersion;
rep.recentRequests = commitData->stats.getRecentRequests();
for (auto v : versions) {
if(v.version > rep.version) {
@ -1628,6 +1660,7 @@ ACTOR static Future<Void> readRequestServer( MasterProxyInterface proxy, Promise
loop {
GetKeyServerLocationsRequest req = waitNext(proxy.getKeyServersLocations.getFuture());
//WARNING: this code is run at a high priority, so it needs to do as little work as possible
commitData->stats.addRequest();
if(req.limit != CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT && //Always do data distribution requests
commitData->stats.keyServerLocationIn.getValue() - commitData->stats.keyServerLocationOut.getValue() > SERVER_KNOBS->KEY_LOCATION_MAX_QUEUE_SIZE) {
++commitData->stats.keyServerLocationErrors;

View File

@ -84,14 +84,14 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
}
when(wait(exitRole)) {
TraceEvent("FastRestore").detail("RestoreApplierCore", "ExitRole").detail("NodeID", self->id());
TraceEvent("RestoreApplierCoreExitRole", self->id());
break;
}
}
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestore")
.detail("RestoreLoaderError", e.what())
.detail("RequestType", requestTypeStr);
TraceEvent(SevWarn, "FastRestoreApplierError", self->id())
.detail("RequestType", requestTypeStr)
.error(e, true);
break;
}
}
@ -112,7 +112,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
// Note: Insert new items into processedFileState will not invalidate the reference.
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutations", self->id())
TraceEvent(SevInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("RestoreAssetMesssageIndex", curMsgIndex.get())
@ -128,35 +128,35 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
state bool isDuplicated = true;
if (curMsgIndex.get() == req.msgIndex - 1) {
isDuplicated = false;
ASSERT(req.mutations.size() == req.mVersions.size());
for (int mIndex = 0; mIndex < req.mutations.size(); mIndex++) {
const MutationRef& mutation = req.mutations[mIndex];
const LogMessageVersion mutationVersion(req.mVersions[mIndex]);
for (int mIndex = 0; mIndex < req.versionedMutations.size(); mIndex++) {
const VersionedMutation& versionedMutation = req.versionedMutations[mIndex];
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
.detail("RestoreAsset", req.asset.toString())
.detail("Version", mutationVersion.toString())
.detail("Version", versionedMutation.version.toString())
.detail("Index", mIndex)
.detail("MutationReceived", mutation.toString());
batchData->counters.receivedBytes += mutation.totalSize();
batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified
.detail("MutationReceived", versionedMutation.mutation.toString());
batchData->counters.receivedBytes += versionedMutation.mutation.totalSize();
batchData->counters.receivedWeightedBytes +=
versionedMutation.mutation.weightedTotalSize(); // atomicOp will be amplified
batchData->counters.receivedMutations += 1;
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0;
batchData->counters.receivedAtomicOps +=
isAtomicOp((MutationRef::Type)versionedMutation.mutation.type) ? 1 : 0;
// Sanity check
ASSERT_WE_THINK(req.asset.isInVersionRange(mutationVersion.version));
ASSERT_WE_THINK(req.asset.isInKeyRange(mutation));
ASSERT_WE_THINK(req.asset.isInVersionRange(versionedMutation.version.version));
ASSERT_WE_THINK(req.asset.isInKeyRange(versionedMutation.mutation));
// Note: Log and range mutations may be delivered out of order. Can we handle it?
batchData->addMutation(mutation, mutationVersion);
batchData->addMutation(versionedMutation.mutation, versionedMutation.version);
ASSERT(mutation.type != MutationRef::SetVersionstampedKey &&
mutation.type != MutationRef::SetVersionstampedValue);
ASSERT(versionedMutation.mutation.type != MutationRef::SetVersionstampedKey &&
versionedMutation.mutation.type != MutationRef::SetVersionstampedValue);
}
curMsgIndex.set(req.msgIndex);
}
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
TraceEvent(SevInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
.detail("BatchIndex", req.batchIndex)
.detail("RestoreAsset", req.asset.toString())
.detail("ProcessedMessageIndex", curMsgIndex.get())
@ -447,7 +447,7 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
// Write mutations to the destination DB
ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData,
Database cx) {
TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID).detail("BatchIndex", batchIndex);
TraceEvent("FastRestoreApplerPhaseApplyTxnStart", applierID).detail("BatchIndex", batchIndex);
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
@ -521,7 +521,7 @@ Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef:
else {
TraceEvent(SevError, "ApplyAtomicOpUnhandledType")
.detail("TypeCode", (int)type)
.detail("TypeName", typeString[type]);
.detail("TypeName", getTypeString(type));
ASSERT(false);
}
return Value();

View File

@ -88,7 +88,7 @@ struct StagingKey {
TraceEvent("StagingKeyAdd")
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("MType", typeString[(int)type])
.detail("MType", getTypeString(type))
.detail("Key", key)
.detail("Val", val)
.detail("NewMutation", m.toString());
@ -118,13 +118,12 @@ struct StagingKey {
// Precompute the final value of the key.
// TODO: Look at the last LogMessageVersion, if it set or clear, we can ignore the rest of versions.
void precomputeResult(const char* context) {
// TODO: Change typeString[(int)type] to a safe function that validate type range
TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult")
.detail("Context", context)
.detail("Version", version.toString())
.detail("Key", key)
.detail("Value", val)
.detail("MType", type < MutationRef::MAX_ATOMIC_OP ? typeString[(int)type] : "[Unset]")
.detail("MType", type < MutationRef::MAX_ATOMIC_OP ? getTypeString(type) : "[Unset]")
.detail("LargestPendingVersion",
(pendingMutations.empty() ? "[none]" : pendingMutations.rbegin()->first.toString()));
std::map<LogMessageVersion, Standalone<MutationRef>>::iterator lb = pendingMutations.lower_bound(version);
@ -138,8 +137,8 @@ struct StagingKey {
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
if (std::tie(type, key, val) != std::tie(m.type, m.param1, m.param2)) {
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnhandledSituation")
.detail("BufferedType", typeString[type])
.detail("PendingType", typeString[m.type])
.detail("BufferedType", getTypeString(type))
.detail("PendingType", getTypeString(m.type))
.detail("BufferedVal", val.toString())
.detail("PendingVal", m.param2.toString());
}
@ -169,11 +168,11 @@ struct StagingKey {
} else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) {
type = MutationRef::SetValue; // Precomputed result should be set to DB.
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet")
.detail("MutationType", typeString[mutation.type])
.detail("MutationType", getTypeString(mutation.type))
.detail("Version", lb->first.toString());
} else {
TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation")
.detail("MutationType", typeString[mutation.type])
.detail("MutationType", getTypeString(mutation.type))
.detail("Version", lb->first.toString());
}
ASSERT(lb->first > version);
@ -267,9 +266,9 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
explicit ApplierBatchData(UID nodeID, int batchIndex)
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM),
vbState(ApplierVersionBatchState::NOT_INIT) {
pollMetrics =
traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY,
&counters.cc, nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
pollMetrics = traceCounters(format("FastRestoreApplierMetrics%d", batchIndex), nodeID,
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex));
TraceEvent("FastRestoreApplierMetricsCreated").detail("Node", nodeID);
}
~ApplierBatchData() = default;
@ -328,7 +327,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
isAtomicOp((MutationRef::Type)m->type))
continue;
else {
TraceEvent(SevError, "FastRestore").detail("UnknownMutationType", m->type);
TraceEvent(SevError, "FastRestoreApplier").detail("UnknownMutationType", m->type);
return false;
}
}

View File

@ -296,7 +296,8 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
if (ongoingReplies.empty()) {
break;
} else {
wait(waitForAny(ongoingReplies));
wait(quorum(ongoingReplies, std::min((int)SERVER_KNOBS->FASTRESTORE_REQBATCH_PARALLEL,
(int)ongoingReplies.size())));
}
// At least one reply is received; Calculate the reply duration
for (int j = 0; j < ongoingReplies.size(); ++j) {
@ -352,12 +353,14 @@ Future<Void> getBatchReplies(RequestStream<Request> Interface::*channel, std::ma
break;
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) break;
fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
// fprintf(stdout, "sendBatchRequests Error code:%d, error message:%s\n", e.code(), e.what());
TraceEvent(SevWarn, "FastRestoreSendBatchRequests").error(e);
for (auto& request : requests) {
TraceEvent(SevWarn, "FastRestore")
TraceEvent(SevWarn, "FastRestoreLoader")
.detail("SendBatchRequests", requests.size())
.detail("RequestID", request.first)
.detail("Request", request.second.toString());
resetReply(request.second);
}
}
}

View File

@ -29,12 +29,11 @@
#include "flow/actorcompiler.h" // This must be the last #include.
// SerializedMutationListMap:
// Key is the signature/version of the mutation list, Value is the mutation list (or part of the mutation list)
typedef std::map<Standalone<StringRef>, Standalone<StringRef>> SerializedMutationListMap;
// SerializedMutationPartMap:
// Key has the same semantics as SerializedMutationListMap; Value is the part number of the splitted mutation list
typedef std::map<Standalone<StringRef>, uint32_t> SerializedMutationPartMap;
// SerializedMutationListMap: Buffered mutation lists from data blocks in log files
// Key is the signature/version of the mutation list; Value.first is the mutation list which may come from multiple
// data blocks of log file; Value.second is the largest part number of the mutation list, which is used to sanity check
// the data blocks for the same mutation list are concatenated in increasing order of part number.
typedef std::map<Standalone<StringRef>, std::pair<Standalone<StringRef>, uint32_t>> SerializedMutationListMap;
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier);
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
@ -54,7 +53,6 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
std::map<UID, RestoreApplierInterface>* pApplierInterfaces);
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* mutationMap,
SerializedMutationPartMap* mutationPartMap,
Reference<IBackupContainer> bc, RestoreAsset asset);
ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
@ -113,14 +111,12 @@ ACTOR Future<Void> restoreLoaderCore(RestoreLoaderInterface loaderInterf, int no
updateProcessStatsTimer = delay(SERVER_KNOBS->FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL);
}
when(wait(exitRole)) {
TraceEvent("FastRestore").detail("RestoreLoaderCore", "ExitRole").detail("NodeID", self->id());
TraceEvent("FastRestoreLoaderCoreExitRole", self->id());
break;
}
}
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestore")
.detail("RestoreLoaderError", e.what())
.detail("RequestType", requestTypeStr);
TraceEvent(SevWarn, "FastRestoreLoader", self->id()).detail("RequestType", requestTypeStr).error(e, true);
break;
}
}
@ -188,7 +184,7 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
int rLen = wait(file->read(mutateString(buf), asset.len, asset.offset));
if (rLen != asset.len) throw restore_bad_read();
TraceEvent("FastRestore")
TraceEvent("FastRestoreLoader")
.detail("DecodingLogFile", asset.filename)
.detail("Offset", asset.offset)
.detail("Length", asset.len);
@ -277,7 +273,6 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
// Must use StandAlone to save mutations, otherwise, the mutationref memory will be corrupted
// mutationMap: Key is the unique identifier for a batch of mutation logs at the same version
state SerializedMutationListMap mutationMap;
state std::map<Standalone<StringRef>, uint32_t> mutationPartMap; // Sanity check the data parsing is correct
state NotifiedVersion processedFileOffset(0);
state std::vector<Future<Void>> fileParserFutures;
state std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsPerLPIter = batchData->kvOpsPerLP.end();
@ -310,8 +305,8 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
kvOpsPerLPIter, samplesIter,
&batchData->counters, bc, subAsset));
} else {
fileParserFutures.push_back(_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap,
&mutationPartMap, bc, subAsset));
fileParserFutures.push_back(
_parseLogFileToMutationsOnLoader(&processedFileOffset, &mutationMap, bc, subAsset));
}
}
}
@ -458,7 +453,6 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
state VersionedMutationsMap::iterator kvOp = kvOps.begin();
state int kvCount = 0;
state int splitMutationIndex = 0;
state std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
state Version msgIndex = 1; // Monotonically increased index for send message, must start at 1
state std::vector<UID> applierIDs = getApplierIDs(*pRangeToApplier);
state double msgSize = 0; // size of mutations in the message
@ -483,22 +477,20 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
splitMutationIndex = 0;
kvCount = 0;
// applierMutationsBuffer is the mutation vector to be sent to each applier
// applierMutationsSize is buffered mutation vector size for each applier
state std::map<UID, MutationsVec> applierMutationsBuffer;
state std::map<UID, LogMessageVersionVec> applierVersionsBuffer;
state std::map<UID, double> applierMutationsSize;
// applierVersionedMutationsBuffer is the mutation-and-its-version vector to be sent to each applier
state std::map<UID, VersionedMutationsVec> applierVersionedMutationsBuffer;
state int mIndex = 0;
state LogMessageVersion commitVersion;
state std::vector<Future<Void>> fSends;
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierVersionsBuffer[applierID] = LogMessageVersionVec();
applierMutationsSize[applierID] = 0.0;
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
const LogMessageVersion& commitVersion = kvOp->first;
commitVersion = kvOp->first;
ASSERT(commitVersion.version >= asset.beginVersion);
ASSERT(commitVersion.version <= asset.endVersion); // endVersion is an empty commit to ensure progress
for (const MutationRef& kvm : kvOp->second) {
for (mIndex = 0; mIndex < kvOp->second.size(); mIndex++) {
MutationRef& kvm = kvOp->second[mIndex];
// Send the mutation to applier
if (isRangeMutation(kvm)) {
MutationsVec mvector;
@ -526,9 +518,10 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("Version", commitVersion.toString())
.detail("Mutation", mutation.toString());
}
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), mutation);
applierVersionsBuffer[applierID].push_back(applierVersionsBuffer[applierID].arena(), commitVersion);
applierMutationsSize[applierID] += mutation.expectedSize();
// CAREFUL: The splitted mutations' lifetime is shorter than the for-loop
// Must use deep copy for splitted mutations
applierVersionedMutationsBuffer[applierID].push_back_deep(
applierVersionedMutationsBuffer[applierID].arena(), VersionedMutation(mutation, commitVersion));
msgSize += mutation.expectedSize();
kvCount++;
@ -546,50 +539,65 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
.detail("Version", commitVersion.toString())
.detail("Mutation", kvm.toString());
}
applierMutationsBuffer[applierID].push_back_deep(applierMutationsBuffer[applierID].arena(), kvm);
applierVersionsBuffer[applierID].push_back(applierVersionsBuffer[applierID].arena(), commitVersion);
applierMutationsSize[applierID] += kvm.expectedSize();
// kvm data is saved in pkvOps in batchData, so shallow copy is ok here.
applierVersionedMutationsBuffer[applierID].push_back(applierVersionedMutationsBuffer[applierID].arena(),
VersionedMutation(kvm, commitVersion));
msgSize += kvm.expectedSize();
}
} // Mutations at the same LogMessageVersion
// Batch same Version's mutations in one request. We could batch more by
// changing the version comparison below.
auto next = std::next(kvOp, 1);
if (next == kvOps.end() || commitVersion.version < next->first.version) {
// if (next == kvOps.end() || msgSize >= SERVER_KNOBS->FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES) {
// TODO: Sanity check each asset has been received exactly once!
// Send the mutations to appliers for each version
for (const UID& applierID : applierIDs) {
requests.emplace_back(applierID,
RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierMutationsBuffer[applierID],
applierVersionsBuffer[applierID]));
// Batch mutations at multiple versions up to FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES size
// to improve bandwidth from a loader to appliers
if (msgSize >= SERVER_KNOBS->FASTRESTORE_LOADER_SEND_MUTATION_MSG_BYTES) {
std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
for (const UID& applierID : applierIDs) {
requests.emplace_back(
applierID, RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierVersionedMutationsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces,
requests, TaskPriority::RestoreLoaderSendMutations));
msgIndex++;
msgSize = 0;
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
wait(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
msgIndex++;
msgSize = 0;
requests.clear();
for (auto& applierID : applierIDs) {
applierMutationsBuffer[applierID] = MutationsVec();
applierVersionsBuffer[applierID] = LogMessageVersionVec();
applierMutationsSize[applierID] = 0.0;
}
}
} // Mutations at the same LogMessageVersion
} // all versions of mutations in the same file
TraceEvent("FastRestore").detail("LoaderSendMutationOnAppliers", kvCount);
// Send the remaining mutations in the applierMutationsBuffer
if (msgSize > 0) {
// TODO: Sanity check each asset has been received exactly once!
std::vector<std::pair<UID, RestoreSendVersionedMutationsRequest>> requests;
for (const UID& applierID : applierIDs) {
requests.emplace_back(applierID,
RestoreSendVersionedMutationsRequest(batchIndex, asset, msgIndex, isRangeFile,
applierVersionedMutationsBuffer[applierID]));
}
TraceEvent(SevDebug, "FastRestoreLoaderSendMutationToApplier")
.detail("MessageIndex", msgIndex)
.detail("RestoreAsset", asset.toString())
.detail("Requests", requests.size());
fSends.push_back(sendBatchRequests(&RestoreApplierInterface::sendMutationVector, *pApplierInterfaces, requests,
TaskPriority::RestoreLoaderSendMutations));
}
wait(waitForAll(fSends));
kvOps = VersionedMutationsMap(); // Free memory for parsed mutations at the restore asset.
TraceEvent("FastRestoreLoaderSendMutationToAppliers")
.detail("BatchIndex", batchIndex)
.detail("RestoreAsset", asset.toString())
.detail("Mutations", kvCount);
return Void();
}
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
TraceEvent(SevWarn, "FastRestoreSplitMutation").detail("Mutation", m.toString());
TraceEvent(SevDebug, "FastRestoreSplitMutation").detail("Mutation", m.toString());
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
@ -646,12 +654,9 @@ void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mv
// key_input format:
// [logRangeMutation.first][hash_value_of_commit_version:1B][bigEndian64(commitVersion)][bigEndian32(part)]
// value_input: serialized binary of mutations at the same version
bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standalone<StringRef>>* pMutationMap,
std::map<Standalone<StringRef>, uint32_t>* pMutationPartMap,
Standalone<StringRef> key_input, Standalone<StringRef> val_input,
const RestoreAsset& asset) {
bool concatenateBackupMutationForLogFile(SerializedMutationListMap* pMutationMap, Standalone<StringRef> key_input,
Standalone<StringRef> val_input, const RestoreAsset& asset) {
SerializedMutationListMap& mutationMap = *pMutationMap;
std::map<Standalone<StringRef>, uint32_t>& mutationPartMap = *pMutationPartMap;
const int key_prefix_len = sizeof(uint8_t) + sizeof(Version) + sizeof(uint32_t);
StringRefReader readerKey(key_input, restore_corrupted_data()); // read key_input!
@ -678,19 +683,19 @@ bool concatenateBackupMutationForLogFile(std::map<Standalone<StringRef>, Standal
auto it = mutationMap.find(id);
if (it == mutationMap.end()) {
mutationMap.insert(std::make_pair(id, val_input));
mutationMap.emplace(id, std::make_pair(val_input, 0));
if (part != 0) {
TraceEvent(SevError, "FastRestore")
TraceEvent(SevError, "FastRestoreLoader")
.detail("FirstPartNotZero", part)
.detail("KeyInput", getHexString(key_input));
}
mutationPartMap.insert(std::make_pair(id, part));
} else { // Concatenate the val string with the same commitVersion
it->second = it->second.contents().withSuffix(val_input.contents()); // Assign the new Areana to the map's value
auto& currentPart = mutationPartMap[id];
it->second.first =
it->second.first.contents().withSuffix(val_input.contents()); // Assign the new Areana to the map's value
auto& currentPart = it->second.second;
if (part != (currentPart + 1)) {
// Check if the same range or log file has been processed more than once!
TraceEvent(SevError, "FastRestore")
TraceEvent(SevError, "FastRestoreLoader")
.detail("CurrentPart1", currentPart)
.detail("CurrentPart2", part)
.detail("KeyInput", getHexString(key_input))
@ -726,7 +731,7 @@ void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
for (auto& m : mutationMap) {
StringRef k = m.first.contents();
StringRef val = m.second.contents();
StringRef val = m.second.first.contents();
StringRefReader kReader(k, restore_corrupted_data());
uint64_t commitVersion = kReader.consume<uint64_t>(); // Consume little Endian data
@ -821,7 +826,7 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
try {
Standalone<VectorRef<KeyValueRef>> kvs =
wait(fileBackup::decodeRangeFileBlock(inFile, asset.offset, asset.len));
TraceEvent("FastRestore")
TraceEvent("FastRestoreLoader")
.detail("DecodedRangeFile", asset.filename)
.detail("DataSize", kvs.contents().size());
blockData = kvs;
@ -894,13 +899,12 @@ ACTOR static Future<Void> _parseRangeFileToMutationsOnLoader(
// pMutationMap: concatenated mutation list string at the mutation's commit version
ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pProcessedFileOffset,
SerializedMutationListMap* pMutationMap,
SerializedMutationPartMap* pMutationPartMap,
Reference<IBackupContainer> bc, RestoreAsset asset) {
Reference<IAsyncFile> inFile = wait(bc->readFile(asset.filename));
// decodeLogFileBlock() must read block by block!
state Standalone<VectorRef<KeyValueRef>> data =
wait(parallelFileRestore::decodeLogFileBlock(inFile, asset.offset, asset.len));
TraceEvent("FastRestore")
TraceEvent("FastRestoreLoader")
.detail("DecodedLogFile", asset.filename)
.detail("Offset", asset.offset)
.detail("Length", asset.len)
@ -912,7 +916,7 @@ ACTOR static Future<Void> _parseLogFileToMutationsOnLoader(NotifiedVersion* pPro
if (pProcessedFileOffset->get() == asset.offset) {
for (const KeyValueRef& kv : data) {
// Concatenate the backuped param1 and param2 (KV) at the same version.
concatenateBackupMutationForLogFile(pMutationMap, pMutationPartMap, kv.key, kv.value, asset);
concatenateBackupMutationForLogFile(pMutationMap, kv.key, kv.value, asset);
}
pProcessedFileOffset->set(asset.offset + asset.len);
}

View File

@ -92,9 +92,9 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
} counters;
explicit LoaderBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex), vbState(LoaderVersionBatchState::NOT_INIT) {
pollMetrics =
traceCounters("FastRestoreLoaderMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY,
&counters.cc, nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex));
pollMetrics = traceCounters(format("FastRestoreLoaderMetrics%d", batchIndex), nodeID,
SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc,
nodeID.toString() + "/RestoreLoaderMetrics/" + std::to_string(batchIndex));
TraceEvent("FastRestoreLoaderMetricsCreated").detail("Node", nodeID);
}
@ -169,7 +169,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
}
void initVersionBatch(int batchIndex) {
TraceEvent("FastRestore").detail("InitVersionBatchOnLoader", nodeID);
TraceEvent("FastRestoreLoaderInitVersionBatch", nodeID).detail("BatchIndex", batchIndex);
batch[batchIndex] = Reference<LoaderBatchData>(new LoaderBatchData(nodeID, batchIndex));
status[batchIndex] = Reference<LoaderBatchStatus>(new LoaderBatchStatus());
}

View File

@ -196,7 +196,7 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
state int numTries = 0;
state int restoreIndex = 0;
TraceEvent("FastRestoreMasterWaitOnRestoreRequests", self->id());
TraceEvent("FastRestoreMasterWaitOnRestoreRequests", self->id()).detail("RestoreRequests", restoreRequests.size());
// DB has been locked where restore request is submitted
wait(clearDB(cx));
@ -326,7 +326,7 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
wait(waitForAll(fBatches));
TraceEvent("FastRestore").detail("RestoreToVersion", request.targetVersion);
TraceEvent("FastRestoreMaster").detail("RestoreToVersion", request.targetVersion);
return request.targetVersion;
}
@ -494,6 +494,12 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
VersionBatch versionBatch) {
state Reference<MasterBatchData> batchData = self->batch[batchIndex];
state Reference<MasterBatchStatus> batchStatus = self->batchStatus[batchIndex];
state double startTime = now();
TraceEvent("FastRestoreMasterDispatchVersionBatchesStart")
.detail("BatchIndex", batchIndex)
.detail("BatchSize", versionBatch.size)
.detail("RunningVersionBatches", self->runningVersionBatches.get());
self->runningVersionBatches.set(self->runningVersionBatches.get() + 1);
@ -540,6 +546,13 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
if (self->delayedActors > 0) {
self->checkMemory.trigger();
}
TraceEvent("FastRestoreMasterDispatchVersionBatchesDone")
.detail("BatchIndex", batchIndex)
.detail("BatchSize", versionBatch.size)
.detail("RunningVersionBatches", self->runningVersionBatches.get())
.detail("Latency", now() - startTime);
return Void();
}
@ -636,6 +649,8 @@ ACTOR static Future<Standalone<VectorRef<RestoreRequest>>> collectRestoreRequest
TraceEvent("FastRestoreMasterPhaseCollectRestoreRequests")
.detail("RestoreRequest", restoreRequests.back().toString());
}
} else {
TraceEvent(SevWarnAlways, "FastRestoreMasterPhaseCollectRestoreRequestsEmptyRequests");
}
break;
}
@ -814,17 +829,17 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
Reference<MasterBatchStatus> batchStatus,
std::map<UID, RestoreApplierInterface> appliersInterf,
int batchIndex, NotifiedVersion* finishedBatch) {
wait(finishedBatch->whenAtLeast(batchIndex - 1));
TraceEvent("FastRestoreMasterPhaseApplyToDB")
TraceEvent("FastRestoreMasterPhaseApplyToDBStart")
.detail("BatchIndex", batchIndex)
.detail("FinishedBatch", finishedBatch->get());
wait(finishedBatch->whenAtLeast(batchIndex - 1));
if (finishedBatch->get() == batchIndex - 1) {
// Prepare the applyToDB requests
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
TraceEvent("FastRestoreMasterPhaseApplyToDB")
TraceEvent("FastRestoreMasterPhaseApplyToDBRunning")
.detail("BatchIndex", batchIndex)
.detail("Appliers", appliersInterf.size());
for (auto& applier : appliersInterf) {
@ -943,7 +958,7 @@ ACTOR static Future<Void> signalRestoreCompleted(Reference<RestoreMasterData> se
}
}
TraceEvent("FastRestore").detail("RestoreMaster", "AllRestoreCompleted");
TraceEvent("FastRestoreMasterAllRestoreCompleted");
return Void();
}

View File

@ -89,7 +89,7 @@ struct MasterBatchData : public ReferenceCounted<MasterBatchData> {
if (applierToRange.find(applier.second) == applierToRange.end()) {
applierToRange[applier.second] = applier.first;
} else {
TraceEvent(SevError, "FastRestore")
TraceEvent(SevError, "FastRestoreMaster")
.detail("SanityCheckApplierKeyRange", applierToRange.size())
.detail("ApplierID", applier.second)
.detail("Key1", applierToRange[applier.second])

View File

@ -100,6 +100,7 @@ void updateProcessStats(Reference<RestoreRoleData> self) {
// in increasing order of their version batch.
ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatchIndex, std::string name) {
self->delayedActors++;
state double memoryThresholdBytes = SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT * 1024 * 1024;
loop {
double memory = getSystemStatistics().processMemory;
if (g_network->isSimulated() && BUGGIFY) {
@ -107,13 +108,13 @@ ACTOR Future<Void> isSchedulable(Reference<RestoreRoleData> self, int actorBatch
// memory will be larger than threshold when deterministicRandom()->random01() > 1/2
memory = SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT * 2 * deterministicRandom()->random01();
}
if (memory < SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT ||
self->finishedBatch.get() + 1 == actorBatchIndex) {
if (memory >= SERVER_KNOBS->FASTRESTORE_MEMORY_THRESHOLD_MB_SOFT) {
if (memory < memoryThresholdBytes || self->finishedBatch.get() + 1 == actorBatchIndex) {
if (memory >= memoryThresholdBytes) {
TraceEvent(SevWarn, "FastRestoreMemoryUsageAboveThreshold")
.detail("BatchIndex", actorBatchIndex)
.detail("FinishedBatch", self->finishedBatch.get())
.detail("Actor", name);
.detail("Actor", name)
.detail("Memory", memory);
}
self->delayedActors--;
break;

View File

@ -38,8 +38,25 @@
#define SevFRMutationInfo SevVerbose
//#define SevFRMutationInfo SevInfo
struct VersionedMutation {
MutationRef mutation;
LogMessageVersion version;
VersionedMutation() = default;
explicit VersionedMutation(MutationRef mutation, LogMessageVersion version)
: mutation(mutation), version(version) {}
explicit VersionedMutation(Arena& arena, const VersionedMutation& vm)
: mutation(arena, vm.mutation), version(vm.version) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, mutation, version);
}
};
using MutationsVec = Standalone<VectorRef<MutationRef>>;
using LogMessageVersionVec = Standalone<VectorRef<LogMessageVersion>>;
using VersionedMutationsVec = Standalone<VectorRef<VersionedMutation>>;
enum class RestoreRole { Invalid = 0, Master = 1, Loader, Applier };
BINARY_SERIALIZABLE(RestoreRole);

View File

@ -66,7 +66,7 @@ ACTOR Future<Void> handlerTerminateWorkerRequest(RestoreSimpleRequest req, Refer
return Void();
}));
TraceEvent("FastRestore").detail("HandleTerminateWorkerReq", self->id());
TraceEvent("FastRestoreWorker").detail("HandleTerminateWorkerReq", self->id());
return Void();
}
@ -97,7 +97,7 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
DUMPTOKEN(recruited.collectRestoreRoleInterfaces);
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreLoaderCore(self->loaderInterf.get(), req.nodeIndex, cx));
TraceEvent("FastRestore").detail("RecruitedLoaderNodeIndex", req.nodeIndex);
TraceEvent("FastRestoreWorker").detail("RecruitedLoaderNodeIndex", req.nodeIndex);
req.reply.send(
RestoreRecruitRoleReply(self->loaderInterf.get().id(), RestoreRole::Loader, self->loaderInterf.get()));
} else if (req.role == RestoreRole::Applier) {
@ -111,12 +111,11 @@ void handleRecruitRoleRequest(RestoreRecruitRoleRequest req, Reference<RestoreWo
DUMPTOKEN(recruited.collectRestoreRoleInterfaces);
DUMPTOKEN(recruited.finishRestore);
actors->add(restoreApplierCore(self->applierInterf.get(), req.nodeIndex, cx));
TraceEvent("FastRestore").detail("RecruitedApplierNodeIndex", req.nodeIndex);
TraceEvent("FastRestoreWorker").detail("RecruitedApplierNodeIndex", req.nodeIndex);
req.reply.send(
RestoreRecruitRoleReply(self->applierInterf.get().id(), RestoreRole::Applier, self->applierInterf.get()));
} else {
TraceEvent(SevError, "FastRestore")
.detail("HandleRecruitRoleRequest", "UnknownRole"); //.detail("Request", req.printable());
TraceEvent(SevError, "FastRestoreWorkerHandleRecruitRoleRequestUnknownRole").detail("Request", req.toString());
}
return;
@ -147,7 +146,7 @@ ACTOR Future<Void> collectRestoreWorkerInterface(Reference<RestoreWorkerData> se
}
break;
}
TraceEvent("FastRestore")
TraceEvent("FastRestoreWorker")
.suppressFor(10.0)
.detail("NotEnoughWorkers", agentValues.size())
.detail("MinWorkers", min_num_workers);
@ -158,7 +157,7 @@ ACTOR Future<Void> collectRestoreWorkerInterface(Reference<RestoreWorkerData> se
}
ASSERT(agents.size() >= min_num_workers); // ASSUMPTION: We must have at least 1 loader and 1 applier
TraceEvent("FastRestore").detail("CollectWorkerInterfaceNumWorkers", self->workerInterfaces.size());
TraceEvent("FastRestoreWorker").detail("CollectWorkerInterfaceNumWorkers", self->workerInterfaces.size());
return Void();
}
@ -182,12 +181,12 @@ ACTOR Future<Void> monitorWorkerLiveness(Reference<RestoreWorkerData> self) {
ACTOR Future<Void> startRestoreWorkerLeader(Reference<RestoreWorkerData> self, RestoreWorkerInterface workerInterf,
Database cx) {
// We must wait for enough time to make sure all restore workers have registered their workerInterfaces into the DB
TraceEvent("FastRestore")
TraceEvent("FastRestoreWorker")
.detail("Master", workerInterf.id())
.detail("WaitForRestoreWorkerInterfaces",
SERVER_KNOBS->FASTRESTORE_NUM_LOADERS + SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS);
wait(delay(10.0));
TraceEvent("FastRestore")
TraceEvent("FastRestoreWorker")
.detail("Master", workerInterf.id())
.detail("CollectRestoreWorkerInterfaces",
SERVER_KNOBS->FASTRESTORE_NUM_LOADERS + SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS);
@ -236,14 +235,12 @@ ACTOR Future<Void> startRestoreWorker(Reference<RestoreWorkerData> self, Restore
exitRole = handlerTerminateWorkerRequest(req, self, interf, cx);
}
when(wait(exitRole)) {
TraceEvent("FastRestore").detail("RestoreWorkerCore", "ExitRole").detail("NodeID", self->id());
TraceEvent("FastRestoreWorkerCoreExitRole", self->id());
break;
}
}
} catch (Error& e) {
TraceEvent(SevWarn, "FastRestore")
.detail("RestoreWorkerError", e.what())
.detail("RequestType", requestTypeStr);
TraceEvent(SevWarn, "FastRestoreWorkerError").detail("RequestType", requestTypeStr).error(e, true);
break;
}
}

View File

@ -57,6 +57,7 @@ struct RestoreWorkerData : NonCopyable, public ReferenceCounted<RestoreWorkerDa
RestoreWorkerData() = default;
~RestoreWorkerData() {
TraceEvent("RestoreWorkerDataDeleted").detail("WorkerID", workerID.toString());
printf("[Exit] Worker:%s RestoreWorkerData is deleted\n", workerID.toString().c_str());
}

View File

@ -843,7 +843,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
double networkMetricsElapsed = networkMetrics.getDouble("Elapsed");
try {
double runLoopBusy = networkMetrics.getDouble("PriorityBusy1");
double runLoopBusy = networkMetrics.getDouble("PriorityStarvedBelow1");
statusObj["run_loop_busy"] = runLoopBusy / networkMetricsElapsed;
}
catch(Error &e) {

View File

@ -60,7 +60,7 @@
#include "fdbmonitor/SimpleIni.h"
#ifdef __linux__
#if defined(__linux__) || defined(__FreeBSD__)
#include <execinfo.h>
#include <signal.h>
#ifdef ALLOC_INSTRUMENTATION
@ -75,6 +75,7 @@
#endif
#include "flow/SimpleOpt.h"
#include <fstream>
#include "flow/actorcompiler.h" // This must be the last #include.
// clang-format off
@ -291,7 +292,7 @@ public:
throw platform_error();
}
permission.set_permissions( &sa );
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
// There is nothing to do here, since the default permissions are fine
#else
#error Port me!
@ -301,7 +302,7 @@ public:
virtual ~WorldReadablePermissions() {
#ifdef _WIN32
LocalFree( sa.lpSecurityDescriptor );
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
// There is nothing to do here, since the default permissions are fine
#else
#error Port me!

View File

@ -3848,9 +3848,9 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
loop {
state Future<Void> infoChanged = self->db->onChange();
state Reference<ProxyInfo> proxies( new ProxyInfo(self->db->get().client.proxies, self->db->get().myLocality) );
state Reference<ProxyInfo> proxies( new ProxyInfo(self->db->get().client.proxies) );
choose {
when( GetStorageServerRejoinInfoReply _rep = wait( proxies->size() ? loadBalance( proxies, &MasterProxyInterface::getStorageServerRejoinInfo, GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()) ) : Never() ) ) {
when( GetStorageServerRejoinInfoReply _rep = wait( proxies->size() ? basicLoadBalance( proxies, &MasterProxyInterface::getStorageServerRejoinInfo, GetStorageServerRejoinInfoRequest(ssi.id(), ssi.locality.dcId()) ) : Never() ) ) {
state GetStorageServerRejoinInfoReply rep = _rep;
try {
tr.reset();

View File

@ -48,6 +48,8 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif
#if defined(__linux__) || defined(__FreeBSD__)
#ifdef USE_GPERFTOOLS
#include "gperftools/profiler.h"
#include "gperftools/heap-profiler.h"
@ -526,7 +528,7 @@ ACTOR Future<Void> registrationClient(
}
}
#if defined(__linux__) && defined(USE_GPERFTOOLS)
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(USE_GPERFTOOLS)
//A set of threads that should be profiled
std::set<std::thread::id> profiledThreads;
@ -538,7 +540,7 @@ int filter_in_thread(void *arg) {
//Enables the calling thread to be profiled
void registerThreadForProfiling() {
#if defined(__linux__) && defined(USE_GPERFTOOLS)
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(USE_GPERFTOOLS)
//Not sure if this is actually needed, but a call to backtrace was advised here:
//http://groups.google.com/group/google-perftools/browse_thread/thread/0dfd74532e038eb8/2686d9f24ac4365f?pli=1
profiledThreads.insert(std::this_thread::get_id());
@ -552,7 +554,7 @@ void registerThreadForProfiling() {
void updateCpuProfiler(ProfilerRequest req) {
switch (req.type) {
case ProfilerRequest::Type::GPROF:
#if defined(__linux__) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(USE_GPERFTOOLS) && !defined(VALGRIND)
switch (req.action) {
case ProfilerRequest::Action::ENABLE: {
const char *path = (const char*)req.outputFile.begin();

View File

@ -205,7 +205,7 @@ struct AtomicOpsWorkload : TestWorkload {
} catch( Error &e ) {
if (e.code() == 1021) {
self->ubsum += intValue;
TraceEvent(SevWarnAlways, "TxnCommitUnknownResult")
TraceEvent(SevInfo, "TxnCommitUnknownResult")
.detail("Value", intValue)
.detail("LogKey", logDebugKey.first)
.detail("OpsKey", opsKey);

View File

@ -86,7 +86,7 @@ struct AtomicRestoreWorkload : TestWorkload {
TraceEvent("AtomicRestore_RestoreStart");
if (self->fastRestore) { // New fast parallel restore
TraceEvent(SevWarnAlways, "AtomicParallelRestore");
TraceEvent(SevInfo, "AtomicParallelRestore");
wait(backupAgent.atomicParallelRestore(cx, BackupAgentBase::getDefaultTag(), self->backupRanges,
StringRef(), StringRef()));
} else { // Old style restore

View File

@ -321,7 +321,11 @@ struct VersionStampWorkload : TestWorkload {
versionStampValue = value.withSuffix(LiteralStringRef("\x00\x00\x00\x00"));
}
state bool ryw = deterministicRandom()->coinflip();
loop{
if (!ryw) {
tr.setOption(FDBTransactionOptions::READ_YOUR_WRITES_DISABLE);
}
state bool error = false;
state Error err;
//TraceEvent("VST_CommitBegin").detail("Key", printable(key)).detail("VsKey", printable(versionStampKey)).detail("Clear", printable(range));

View File

@ -94,6 +94,13 @@ elseif(WIN32)
target_link_libraries(flow PUBLIC winmm.lib)
target_link_libraries(flow PUBLIC psapi.lib)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
set (FLOW_LIBS ${FLOW_LIBS} execinfo devstat)
find_library(EIO eio)
if(EIO)
target_link_libraries(flow PUBLIC ${EIO})
endif()
endif()
target_link_libraries(flow PRIVATE ${FLOW_LIBS})
if(USE_VALGRIND)
target_link_libraries(flow PUBLIC Valgrind)

View File

@ -41,6 +41,10 @@
#include <linux/mman.h>
#endif
#ifdef __FreeBSD__
#include <sys/mman.h>
#endif
#define FAST_ALLOCATOR_DEBUG 0
#ifdef _MSC_VER
@ -54,6 +58,8 @@
#elif defined(__GNUG__)
#ifdef __linux__
#define INIT_SEG __attribute__ ((init_priority (1000)))
#elif defined(__FreeBSD__)
#define INIT_SEG __attribute__ ((init_priority (1000)))
#elif defined(__APPLE__)
#pragma message "init_priority is not supported on this platform; will this be a problem?"
#define INIT_SEG

View File

@ -207,6 +207,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( FUTURE_VERSION_BACKOFF_GROWTH, 2.0 );
init( LOAD_BALANCE_MAX_BAD_OPTIONS, 1 ); //should be the same as MAX_MACHINES_FALLING_BEHIND
init( LOAD_BALANCE_PENALTY_IS_BAD, true );
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 2.0 );
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.05 );
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
init( BASIC_LOAD_BALANCE_BUCKETS, 40 );
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -226,6 +226,10 @@ public:
double FUTURE_VERSION_BACKOFF_GROWTH;
int LOAD_BALANCE_MAX_BAD_OPTIONS;
bool LOAD_BALANCE_PENALTY_IS_BAD;
double BASIC_LOAD_BALANCE_UPDATE_RATE;
double BASIC_LOAD_BALANCE_MAX_CHANGE;
double BASIC_LOAD_BALANCE_MAX_PROB;
int BASIC_LOAD_BALANCE_BUCKETS;
// Health Monitor
int FAILURE_DETECTION_DELAY;

View File

@ -55,7 +55,7 @@ intptr_t g_stackYieldLimit = 0;
using namespace boost::asio::ip;
#if defined(__linux__)
#if defined(__linux__) || defined(__FreeBSD__)
#include <execinfo.h>
std::atomic<int64_t> net2RunLoopIterations(0);

View File

@ -104,6 +104,39 @@
#include <sys/sysmacros.h>
#endif
#ifdef __FreeBSD__
/* Needed for processor affinity */
#include <sys/sched.h>
/* Needed for getProcessorTime and setpriority */
#include <sys/syscall.h>
/* Needed for setpriority */
#include <sys/resource.h>
/* Needed for crash handler */
#include <sys/signal.h>
/* Needed for proc info */
#include <sys/user.h>
/* Needed for vm info */
#include <sys/param.h>
#include <sys/sysctl.h>
#include <sys/vmmeter.h>
#include <sys/cpuset.h>
#include <sys/resource.h>
/* Needed for sysctl info */
#include <sys/sysctl.h>
#include <sys/fcntl.h>
/* Needed for network info */
#include <net/if.h>
#include <net/if_mib.h>
#include <net/if_var.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netinet/tcp_var.h>
/* Needed for device info */
#include <devstat.h>
#include <kvm.h>
#include <libutil.h>
#endif
#ifdef __APPLE__
#include <sys/uio.h>
#include <sys/syslimits.h>
@ -203,7 +236,7 @@ double getProcessorTimeThread() {
throw platform_error();
}
return FiletimeAsInt64(ftKernel) / double(1e7) + FiletimeAsInt64(ftUser) / double(1e7);
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
return getProcessorTimeGeneric(RUSAGE_THREAD);
#elif defined(__APPLE__)
/* No RUSAGE_THREAD so we use the lower level interface */
@ -255,6 +288,29 @@ uint64_t getResidentMemoryUsage() {
rssize *= sysconf(_SC_PAGESIZE);
return rssize;
#elif defined(__FreeBSD__)
uint64_t rssize = 0;
int status;
pid_t ppid = getpid();
int pidinfo[4];
pidinfo[0] = CTL_KERN;
pidinfo[1] = KERN_PROC;
pidinfo[2] = KERN_PROC_PID;
pidinfo[3] = (int)ppid;
struct kinfo_proc procstk;
size_t len = sizeof(procstk);
status = sysctl(pidinfo, nitems(pidinfo), &procstk, &len, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetResidentMemoryUsage").GetLastError();
throw platform_error();
}
rssize = (uint64_t)procstk.ki_rssize;
return rssize;
#elif defined(_WIN32)
PROCESS_MEMORY_COUNTERS_EX pmc;
@ -292,6 +348,29 @@ uint64_t getMemoryUsage() {
vmsize *= sysconf(_SC_PAGESIZE);
return vmsize;
#elif defined(__FreeBSD__)
uint64_t vmsize = 0;
int status;
pid_t ppid = getpid();
int pidinfo[4];
pidinfo[0] = CTL_KERN;
pidinfo[1] = KERN_PROC;
pidinfo[2] = KERN_PROC_PID;
pidinfo[3] = (int)ppid;
struct kinfo_proc procstk;
size_t len = sizeof(procstk);
status = sysctl(pidinfo, nitems(pidinfo), &procstk, &len, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetMemoryUsage").GetLastError();
throw platform_error();
}
vmsize = (uint64_t)procstk.ki_size >> PAGE_SHIFT;
return vmsize;
#elif defined(_WIN32)
PROCESS_MEMORY_COUNTERS_EX pmc;
@ -401,6 +480,52 @@ void getMachineRAMInfo(MachineRAMInfo& memInfo) {
memInfo.available = 1024 * (std::max<int64_t>(0, (memFree-lowWatermark) + std::max(pageCache-lowWatermark, pageCache/2) + std::max(slabReclaimable-lowWatermark, slabReclaimable/2)) - usedSwap);
}
memInfo.committed = memInfo.total - memInfo.available;
#elif defined(__FreeBSD__)
int status;
u_int page_size;
u_int free_count;
u_int active_count;
u_int inactive_count;
u_int wire_count;
size_t uint_size;
uint_size = sizeof(page_size);
status = sysctlbyname("vm.stats.vm.v_page_size", &page_size, &uint_size, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetMachineMemInfo").GetLastError();
throw platform_error();
}
status = sysctlbyname("vm.stats.vm.v_free_count", &free_count, &uint_size, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetMachineMemInfo").GetLastError();
throw platform_error();
}
status = sysctlbyname("vm.stats.vm.v_active_count", &active_count, &uint_size, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetMachineMemInfo").GetLastError();
throw platform_error();
}
status = sysctlbyname("vm.stats.vm.v_inactive_count", &inactive_count, &uint_size, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetMachineMemInfo").GetLastError();
throw platform_error();
}
status = sysctlbyname("vm.stats.vm.v_wire_count", &wire_count, &uint_size, NULL, 0);
if (status < 0){
TraceEvent(SevError, "GetMachineMemInfo").GetLastError();
throw platform_error();
}
memInfo.total = (int64_t)((free_count + active_count + inactive_count + wire_count) * (u_int64_t)(page_size));
memInfo.available = (int64_t)(free_count * (u_int64_t)(page_size));
memInfo.committed = memInfo.total - memInfo.available;
#elif defined(_WIN32)
MEMORYSTATUSEX mem_status;
@ -456,7 +581,7 @@ Error systemErrorCodeToError() {
void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) {
INJECT_FAULT( platform_error, "getDiskBytes" );
#if defined(__unixish__)
#ifdef __linux__
#if defined (__linux__) || defined (__FreeBSD__)
struct statvfs buf;
if (statvfs(directory.c_str(), &buf)) {
Error e = systemErrorCodeToError();
@ -755,6 +880,196 @@ dev_t getDeviceId(std::string path) {
#endif
#if defined(__FreeBSD__)
void getNetworkTraffic(const IPAddress ip, uint64_t& bytesSent, uint64_t& bytesReceived,
uint64_t& outSegs, uint64_t& retransSegs) {
INJECT_FAULT( platform_error, "getNetworkTraffic" );
const char* ifa_name = nullptr;
try {
ifa_name = getInterfaceName(ip);
}
catch(Error &e) {
if(e.code() != error_code_platform_error) {
throw;
}
}
if (!ifa_name)
return;
struct ifaddrs *interfaces = NULL;
if (getifaddrs(&interfaces))
{
TraceEvent(SevError, "GetNetworkTrafficError").GetLastError();
throw platform_error();
}
int if_count, i;
int mib[6];
size_t ifmiblen;
struct ifmibdata ifmd;
mib[0] = CTL_NET;
mib[1] = PF_LINK;
mib[2] = NETLINK_GENERIC;
mib[3] = IFMIB_IFDATA;
mib[4] = IFMIB_IFCOUNT;
mib[5] = IFDATA_GENERAL;
ifmiblen = sizeof(ifmd);
for (i = 1; i <= if_count; i++)
{
mib[4] = i;
sysctl(mib, 6, &ifmd, &ifmiblen, (void *)0, 0);
if (!strcmp(ifmd.ifmd_name, ifa_name))
{
bytesSent = ifmd.ifmd_data.ifi_obytes;
bytesReceived = ifmd.ifmd_data.ifi_ibytes;
break;
}
}
freeifaddrs(interfaces);
struct tcpstat tcpstat;
size_t stat_len;
stat_len = sizeof(tcpstat);
int tcpstatus = sysctlbyname("net.inet.tcp.stats", &tcpstat, &stat_len, NULL, 0);
if (tcpstatus < 0) {
TraceEvent(SevError, "GetNetworkTrafficError").GetLastError();
throw platform_error();
}
outSegs = tcpstat.tcps_sndtotal;
retransSegs = tcpstat.tcps_sndrexmitpack;
}
void getMachineLoad(uint64_t& idleTime, uint64_t& totalTime, bool logDetails) {
INJECT_FAULT( platform_error, "getMachineLoad" );
long cur[CPUSTATES], last[CPUSTATES];
size_t cur_sz = sizeof cur;
int cpustate;
long sum;
memset(last, 0, sizeof last);
if (sysctlbyname("kern.cp_time", &cur, &cur_sz, NULL, 0) < 0)
{
TraceEvent(SevError, "GetMachineLoad").GetLastError();
throw platform_error();
}
sum = 0;
for (cpustate = 0; cpustate < CPUSTATES; cpustate++)
{
long tmp = cur[cpustate];
cur[cpustate] -= last[cpustate];
last[cpustate] = tmp;
sum += cur[cpustate];
}
totalTime = (uint64_t)(cur[CP_USER] + cur[CP_NICE] + cur[CP_SYS] + cur[CP_IDLE]);
idleTime = (uint64_t)(cur[CP_IDLE]);
//need to add logging here to TraceEvent
}
void getDiskStatistics(std::string const& directory, uint64_t& currentIOs, uint64_t& busyTicks, uint64_t& reads, uint64_t& writes, uint64_t& writeSectors, uint64_t& readSectors) {
INJECT_FAULT( platform_error, "getDiskStatistics" );
currentIOs = 0;
busyTicks = 0;
reads = 0;
writes = 0;
writeSectors = 0;
readSectors = 0;
struct stat buf;
if (stat(directory.c_str(), &buf)) {
TraceEvent(SevError, "GetDiskStatisticsStatError").detail("Directory", directory).GetLastError();
throw platform_error();
}
static struct statinfo dscur;
double etime;
struct timespec ts;
static int num_devices;
kvm_t *kd = NULL;
etime = ts.tv_nsec * 1e-6;;
int dn;
u_int64_t total_transfers_read, total_transfers_write;
u_int64_t total_blocks_read, total_blocks_write;
u_int64_t queue_len;
long double ms_per_transaction;
dscur.dinfo = (struct devinfo *)calloc(1, sizeof(struct devinfo));
if (dscur.dinfo == NULL) {
TraceEvent(SevError, "GetDiskStatisticsStatError").GetLastError();
throw platform_error();
}
if (devstat_getdevs(kd, &dscur) == -1) {
TraceEvent(SevError, "GetDiskStatisticsStatError").GetLastError();
throw platform_error();
}
num_devices = dscur.dinfo->numdevs;
for (dn = 0; dn < num_devices; dn++)
{
if (devstat_compute_statistics(&dscur.dinfo->devices[dn], NULL, etime,
DSM_MS_PER_TRANSACTION, &ms_per_transaction,
DSM_TOTAL_TRANSFERS_READ, &total_transfers_read,
DSM_TOTAL_TRANSFERS_WRITE, &total_transfers_write,
DSM_TOTAL_BLOCKS_READ, &total_blocks_read,
DSM_TOTAL_BLOCKS_WRITE, &total_blocks_write,
DSM_QUEUE_LENGTH, &queue_len,
DSM_NONE) != 0) {
TraceEvent(SevError, "GetDiskStatisticsStatError").GetLastError();
throw platform_error();
}
currentIOs = queue_len;
busyTicks = (u_int64_t)ms_per_transaction;
reads = total_transfers_read;
writes = total_transfers_write;
writeSectors = total_blocks_read;
readSectors = total_blocks_write;
}
}
dev_t getDeviceId(std::string path) {
struct stat statInfo;
while (true) {
int returnValue = stat(path.c_str(), &statInfo);
if (!returnValue) break;
if (errno == ENOENT) {
path = parentDirectory(path);
} else {
TraceEvent(SevError, "GetDeviceIdError").detail("Path", path).GetLastError();
throw platform_error();
}
}
return statInfo.st_dev;
}
#endif
#ifdef __APPLE__
void getNetworkTraffic(const IPAddress& ip, uint64_t& bytesSent, uint64_t& bytesReceived, uint64_t& outSegs,
uint64_t& retransSegs) {
@ -1277,7 +1592,7 @@ struct OffsetTimer {
return offset + count * secondsPerCount;
}
};
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
#define DOUBLETIME(ts) (double(ts.tv_sec) + (ts.tv_nsec * 1e-9))
#ifndef CLOCK_MONOTONIC_RAW
#define CLOCK_MONOTONIC_RAW 4 // Confirmed safe to do with glibc >= 2.11 and kernel >= 2.6.28. No promises with older glibc. Older kernel definitely breaks it.
@ -1342,7 +1657,7 @@ double timer() {
GetSystemTimeAsFileTime(&fileTime);
static_assert( sizeof(fileTime) == sizeof(uint64_t), "FILETIME size wrong" );
return (*(uint64_t*)&fileTime - FILETIME_C_EPOCH) * 100e-9;
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return double(ts.tv_sec) + (ts.tv_nsec * 1e-9);
@ -1362,7 +1677,7 @@ uint64_t timer_int() {
GetSystemTimeAsFileTime(&fileTime);
static_assert( sizeof(fileTime) == sizeof(uint64_t), "FILETIME size wrong" );
return (*(uint64_t*)&fileTime - FILETIME_C_EPOCH);
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return uint64_t(ts.tv_sec) * 1e9 + ts.tv_nsec;
@ -1412,7 +1727,7 @@ void setMemoryQuota( size_t limit ) {
}
if (!AssignProcessToJobObject( job, GetCurrentProcess() ))
TraceEvent(SevWarn, "FailedToSetMemoryLimit").GetLastError();
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
struct rlimit rlim;
if (getrlimit(RLIMIT_AS, &rlim)) {
TraceEvent(SevError, "GetMemoryLimit").GetLastError();
@ -1514,7 +1829,7 @@ static void *allocateInternal(size_t length, bool largePages) {
flags |= MAP_HUGETLB;
return mmap(NULL, length, PROT_READ|PROT_WRITE, flags, -1, 0);
#elif defined(__APPLE__)
#elif defined(__APPLE__) || defined(__FreeBSD__)
int flags = MAP_PRIVATE|MAP_ANON;
return mmap(NULL, length, PROT_READ|PROT_WRITE, flags, -1, 0);
@ -1588,6 +1903,11 @@ void setAffinity(int proc) {
CPU_ZERO(&set);
CPU_SET(proc, &set);
sched_setaffinity(0, sizeof(cpu_set_t), &set);
#elif defined(__FreeBSD__)
cpuset_t set;
CPU_ZERO(&set);
CPU_SET(proc, &set);
cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1,sizeof(set), &set);
#endif
}
@ -1648,7 +1968,7 @@ void renameFile( std::string const& fromPath, std::string const& toPath ) {
//renamedFile();
return;
}
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
if (!rename( fromPath.c_str(), toPath.c_str() )) {
//FIXME: We cannot inject faults after renaming the file, because we could end up with two asyncFileNonDurable open for the same file
//renamedFile();
@ -1814,7 +2134,7 @@ bool createDirectory( std::string const& directory ) {
Error e = systemErrorCodeToError();
TraceEvent(SevError, "CreateDirectory").detail("Directory", directory).GetLastError().error(e);
throw e;
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
size_t sep = 0;
do {
sep = directory.find_first_of('/', sep + 1);
@ -1967,8 +2287,7 @@ std::string abspath( std::string const& path, bool resolveLinks, bool mustExist
if (*x == '/')
*x = CANONICAL_PATH_SEPARATOR;
return nameBuffer;
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
char result[PATH_MAX];
// Must resolve links, so first try realpath on the whole thing
const char *r = realpath( path.c_str(), result );
@ -2031,7 +2350,7 @@ std::string getUserHomeDirectory() {
#ifdef _WIN32
#define FILE_ATTRIBUTE_DATA DWORD
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
#define FILE_ATTRIBUTE_DATA mode_t
#else
#error Port me!
@ -2040,7 +2359,7 @@ std::string getUserHomeDirectory() {
bool acceptFile( FILE_ATTRIBUTE_DATA fileAttributes, std::string name, std::string extension ) {
#ifdef _WIN32
return !(fileAttributes & FILE_ATTRIBUTE_DIRECTORY) && StringRef(name).endsWith(extension);
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
return S_ISREG(fileAttributes) && StringRef(name).endsWith(extension);
#else
#error Port me!
@ -2050,7 +2369,7 @@ bool acceptFile( FILE_ATTRIBUTE_DATA fileAttributes, std::string name, std::stri
bool acceptDirectory( FILE_ATTRIBUTE_DATA fileAttributes, std::string name, std::string extension ) {
#ifdef _WIN32
return (fileAttributes & FILE_ATTRIBUTE_DIRECTORY) != 0;
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
return S_ISDIR(fileAttributes);
#else
#error Port me!
@ -2086,7 +2405,7 @@ std::vector<std::string> findFiles( std::string const& directory, std::string co
}
FindClose(h);
}
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
DIR *dip;
if ((dip = opendir(directory.c_str())) != NULL) {
@ -2150,7 +2469,7 @@ void findFilesRecursively(std::string path, std::vector<std::string> &out) {
void threadSleep( double seconds ) {
#ifdef _WIN32
Sleep( (DWORD)(seconds * 1e3) );
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
struct timespec req, rem;
req.tv_sec = seconds;
@ -2201,7 +2520,7 @@ void setCloseOnExec( int fd ) {
THREAD_HANDLE startThread(void (*func) (void *), void *arg) {
return (void *)_beginthread(func, 0, arg);
}
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
THREAD_HANDLE startThread(void *(*func) (void *), void *arg) {
pthread_t t;
pthread_create(&t, NULL, func, arg);
@ -2214,7 +2533,7 @@ THREAD_HANDLE startThread(void *(*func) (void *), void *arg) {
void waitThread(THREAD_HANDLE thread) {
#ifdef _WIN32
WaitForSingleObject(thread, INFINITE);
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
pthread_join(thread, NULL);
#else
#error Port me!
@ -2256,7 +2575,7 @@ int64_t fileSize(std::string const& filename) {
return 0;
else
return file_status.st_size;
#elif (defined(__linux__) || defined(__APPLE__))
#elif (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
struct stat file_status;
if(stat(filename.c_str(), &file_status) != 0)
return 0;
@ -2395,7 +2714,7 @@ std::string getDefaultConfigPath() {
return _filepath + "\\foundationdb";
#elif defined(__linux__)
return "/etc/foundationdb";
#elif defined(__APPLE__)
#elif defined(__APPLE__) || defined(__FreeBSD__)
return "/usr/local/etc/foundationdb";
#else
#error Port me!
@ -2524,7 +2843,7 @@ int eraseDirectoryRecursive(std::string const& dir) {
__eraseDirectoryRecurseiveCount = 0;
#ifdef _WIN32
system( ("rd /s /q \"" + dir + "\"").c_str() );
#elif defined(__linux__) || defined(__APPLE__)
#elif defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__)
int error =
nftw(dir.c_str(),
[](const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) -> int {
@ -2701,7 +3020,7 @@ void* getImageOffset() { return NULL; }
#endif
bool isLibraryLoaded(const char* lib_path) {
#if !defined(__linux__) && !defined(__APPLE__) && !defined(_WIN32)
#if !defined(__linux__) && !defined(__APPLE__) && !defined(_WIN32) && !defined(__FreeBSD__)
#error Port me!
#endif
@ -2717,7 +3036,7 @@ bool isLibraryLoaded(const char* lib_path) {
}
void* loadLibrary(const char* lib_path) {
#if !defined(__linux__) && !defined(__APPLE__) && !defined(_WIN32)
#if !defined(__linux__) && !defined(__APPLE__) && !defined(_WIN32) && !defined(__FreeBSD__)
#error Port me!
#endif
@ -2774,6 +3093,20 @@ std::string exePath() {
} else {
throw platform_error();
}
#elif defined(__FreeBSD__)
char binPath[2048];
int mib[4];
mib[0] = CTL_KERN;
mib[1] = KERN_PROC;
mib[2] = KERN_PROC_PATHNAME;
mib[3] = -1;
size_t len = sizeof(binPath);
if (sysctl(mib, 4, binPath, &len, NULL, 0) != 0) {
binPath[0] = '\0';
return std::string(binPath);
} else {
throw platform_error();
}
#elif defined(__APPLE__)
uint32_t bufSize = 1024;
std::unique_ptr<char[]> buf(new char[bufSize]);

View File

@ -22,7 +22,7 @@
#define FLOW_PLATFORM_H
#pragma once
#if (defined(__linux__) || defined(__APPLE__))
#if (defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__))
#define __unixish__ 1
#endif
@ -172,6 +172,8 @@ THREAD_HANDLE startThread(void *(func) (void *), void *arg);
#define DYNAMIC_LIB_EXT ".dll"
#elif defined(__linux)
#define DYNAMIC_LIB_EXT ".so"
#elif defined(__FreeBSD__)
#define DYNAMIC_LIB_EXT ".so"
#elif defined(__APPLE__)
#define DYNAMIC_LIB_EXT ".dylib"
#else
@ -422,6 +424,16 @@ inline static uint64_t __rdtsc() {
#endif
#endif
#ifdef __FreeBSD__
#if !(__has_builtin(__rdtsc))
inline static uint64_t __rdtsc() {
uint64_t lo, hi;
asm( "rdtsc" : "=a" (lo), "=d" (hi) );
return( lo | (hi << 32) );
}
#endif
#endif
#ifdef _WIN32
#include <intrin.h>
inline static int32_t interlockedIncrement(volatile int32_t *a) { return _InterlockedIncrement((long*)a); }
@ -531,6 +543,8 @@ inline static void aligned_free(void* ptr) { free(ptr); }
#if (!defined(_ISOC11_SOURCE)) // old libc versions
inline static void* aligned_alloc(size_t alignment, size_t size) { return memalign(alignment, size); }
#endif
#elif defined(__FreeBSD__)
inline static void aligned_free(void* ptr) { free(ptr); }
#elif defined(__APPLE__)
#if !defined(HAS_ALIGNED_ALLOC)
#include <cstdlib>

View File

@ -37,7 +37,7 @@ extern std::string format( const char *form, ... );
Event::Event() {
#ifdef _WIN32
ev = CreateEvent(NULL, FALSE, FALSE, NULL);
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
int result = sem_init(&sem, 0, 0);
if (result)
criticalError(FDB_EXIT_INIT_SEMAPHORE, "UnableToInitializeSemaphore", format("Could not initialize semaphore - %s", strerror(errno)).c_str());
@ -54,7 +54,7 @@ Event::Event() {
Event::~Event() {
#ifdef _WIN32
CloseHandle(ev);
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
sem_destroy(&sem);
#elif defined(__APPLE__)
semaphore_destroy(self, sem);
@ -66,7 +66,7 @@ Event::~Event() {
void Event::set() {
#ifdef _WIN32
SetEvent(ev);
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
sem_post(&sem);
#elif defined(__APPLE__)
semaphore_signal(sem);
@ -78,7 +78,7 @@ void Event::set() {
void Event::block() {
#ifdef _WIN32
WaitForSingleObject(ev, INFINITE);
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
int ret;
do {
ret = sem_wait(&sem);

View File

@ -25,7 +25,7 @@
#include "flow/Error.h"
#include "flow/Trace.h"
#ifdef __linux__
#if defined(__linux__) || defined(__FreeBSD__)
#include <semaphore.h>
#endif
@ -115,7 +115,7 @@ public:
private:
#ifdef _WIN32
void* ev;
#elif defined(__linux__)
#elif defined(__linux__) || defined(__FreeBSD__)
sem_t sem;
#elif defined(__APPLE__)
mach_port_t self;

View File

@ -38,7 +38,7 @@ function create_server_environment() {
fi
echo "export PUBLIC_IP=$public_ip" >> $env_file
if [[ -z $FDB_COORDINATOR ]]; then
if [[ -z $FDB_COORDINATOR && -z "$FDB_CLUSTER_FILE_CONTENTS" ]]; then
FDB_CLUSTER_FILE_CONTENTS="docker:docker@$public_ip:$FDB_PORT"
fi