Merge remote-tracking branch 'upstream/release-6.2' into faster-remote-dc

This commit is contained in:
Alex Miller 2019-09-12 18:46:03 -07:00
commit befa0646b3
25 changed files with 444 additions and 176 deletions

View File

@ -18,7 +18,7 @@
# limitations under the License.
cmake_minimum_required(VERSION 3.12)
project(foundationdb
VERSION 6.2.0
VERSION 6.2.4
DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions."
HOMEPAGE_URL "http://www.foundationdb.org/"
LANGUAGES C CXX ASM)
@ -75,8 +75,7 @@ message(STATUS "Current git version ${CURRENT_GIT_VERSION}")
# Version information
################################################################################
set(USE_VERSIONS_TARGET OFF CACHE BOOL "Use the deprecated versions.target file")
if(USE_VERSIONS_TARGET)
if(NOT WIN32)
add_custom_target(version_file ALL DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/versions.target)
execute_process(
COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/build/get_version.sh ${CMAKE_CURRENT_SOURCE_DIR}/versions.target
@ -84,8 +83,17 @@ if(USE_VERSIONS_TARGET)
execute_process(
COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/build/get_package_name.sh ${CMAKE_CURRENT_SOURCE_DIR}/versions.target
OUTPUT_VARIABLE FDB_PACKAGE_NAME_WNL)
string(STRIP "${FDB_VERSION_WNL}" FDB_VERSION)
string(STRIP "${FDB_PACKAGE_NAME_WNL}" FDB_PACKAGE_NAME)
string(STRIP "${FDB_VERSION_WNL}" FDB_VERSION_TARGET_FILE)
string(STRIP "${FDB_PACKAGE_NAME_WNL}" FDB_PACKAGE_NAME_TARGET_FILE)
endif()
set(USE_VERSIONS_TARGET OFF CACHE BOOL "Use the deprecated versions.target file")
if(USE_VERSIONS_TARGET)
if (WIN32)
message(FATAL_ERROR "USE_VERSION_TARGET us not supported on Windows")
endif()
set(FDB_VERSION ${FDB_VERION_TARGET_FILE})
set(FDB_PACKAGE_NAME ${FDB_PACKAGE_NAME_TARGET_FILE})
set(FDB_VERSION_PLAIN ${FDB_VERSION})
if(NOT FDB_RELEASE)
set(FDB_VERSION "${FDB_VERSION}-PRERELEASE")
@ -94,6 +102,17 @@ else()
set(FDB_PACKAGE_NAME "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}")
set(FDB_VERSION ${PROJECT_VERSION})
set(FDB_VERSION_PLAIN ${FDB_VERSION})
if(NOT WIN32)
# we need to assert that the cmake version is in sync with the target version
if(NOT (FDB_VERSION STREQUAL FDB_VERSION_TARGET_FILE))
message(SEND_ERROR "The project version in cmake is set to ${FDB_VERSION},\
but versions.target has it at ${FDB_VERSION_TARGET_FILE}")
endif()
if(NOT (FDB_PACKAGE_NAME STREQUAL FDB_PACKAGE_NAME_TARGET_FILE))
message(SEND_ERROR "The package name in cmake is set to ${FDB_PACKAGE_NAME},\
but versions.target has it set to ${FDB_PACKAGE_NAME_TARGET_FILE}")
endif()
endif()
endif()
message(STATUS "FDB version is ${FDB_VERSION}")

View File

@ -92,11 +92,11 @@ bindings/c/foundationdb/fdb_c_options.g.h: bin/vexillographer.exe fdbclient/vexi
bin/fdb_c_performance_test: bindings/c/test/performance_test.c bindings/c/test/test.h fdb_c
@echo "Compiling fdb_c_performance_test"
@$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ bindings/c/test/performance_test.c $(fdb_c_tests_LIBS)
@$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ -c bindings/c/test/performance_test.c
bin/fdb_c_ryw_benchmark: bindings/c/test/ryw_benchmark.c bindings/c/test/test.h fdb_c
@echo "Compiling fdb_c_ryw_benchmark"
@$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ bindings/c/test/ryw_benchmark.c $(fdb_c_tests_LIBS)
@$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ -c bindings/c/test/ryw_benchmark.c
packages/fdb-c-tests-$(VERSION)-$(PLATFORM).tar.gz: bin/fdb_c_performance_test bin/fdb_c_ryw_benchmark
@echo "Packaging $@"

View File

@ -25,6 +25,7 @@ import uuid
import struct
import math
import sys
import functools
from bisect import bisect_left
from fdb import six
@ -72,6 +73,7 @@ def _float_adjust(v, encode):
return six.int2byte(six.indexbytes(v, 0) ^ 0x80) + v[1:]
@functools.total_ordering
class SingleFloat(object):
def __init__(self, value):
if isinstance(value, float):
@ -91,21 +93,9 @@ class SingleFloat(object):
else:
return False
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return _compare_floats(self.value, other.value) < 0
def __le__(self, other):
return _compare_floats(self.value, other.value) <= 0
def __gt__(self, other):
return not (self <= other)
def __ge__(self, other):
return not (self < other)
def __str__(self):
return str(self.value)
@ -124,6 +114,7 @@ class SingleFloat(object):
return bool(self.value)
@functools.total_ordering
class Versionstamp(object):
LENGTH = 12
_TR_VERSION_LEN = 10
@ -200,25 +191,22 @@ class Versionstamp(object):
else:
return False
def __ne__(self, other):
return not (self == other)
def __cmp__(self, other):
def __lt__(self, other):
if self.is_complete():
if other.is_complete():
if self.tr_version == other.tr_version:
return cmp(self.user_version, other.user_version)
return self.user_version < other.user_version
else:
return cmp(self.tr_version, other.tr_version)
return self.tr_version < other.tr_version
else:
# All complete are less than all incomplete.
return -1
return True
else:
if other.is_complete():
# All incomplete are greater than all complete
return 1
return False
else:
return cmp(self.user_version, other.user_version)
return self.user_version < other.user_version
def __hash__(self):
if self.tr_version is None:

View File

@ -1,32 +1,65 @@
#!/bin/bash
set -e
OPTIONS=''
# Get compiler version and major version
COMPILER_VER=$("${CC}" -dumpversion)
COMPILER_MAJVER="${COMPILER_VER%%\.*}"
# Add linker, if specified and valid
# The linker to use for building:
# can be LD (system default, default choice), GOLD, LLD, or BFD
if [ -n "${USE_LD}" ] && \
(([[ "${CC}" == *"gcc"* ]] && [ "${COMPILER_MAJVER}" -ge 9 ]) || \
([[ "${CXX}" == *"clang++"* ]] && [ "${COMPILER_MAJVER}" -ge 4 ]) )
then
if [ "${PLATFORM}" == "linux" ]; then
if [ "${USE_LD}" == "BFD" ]; then
OPTIONS+='-fuse-ld=bfd -Wl,--disable-new-dtags'
elif [ "${USE_LD}" == "GOLD" ]; then
OPTIONS+='-fuse-ld=gold -Wl,--disable-new-dtags'
elif [ "${USE_LD}" == "LLD" ]; then
OPTIONS+='-fuse-ld=lld -Wl,--disable-new-dtags'
elif [ "${USE_LD}" != "DEFAULT" ] && [ "${USE_LD}" != "LD" ]; then
echo 'USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!'
exit 1
fi
fi
fi
case $1 in
Application | DynamicLibrary)
echo "Linking $3"
if [ "$1" = "DynamicLibrary" ]; then
OPTIONS="-shared"
if [ "$PLATFORM" = "linux" ]; then
OPTIONS="$OPTIONS -Wl,-z,noexecstack -Wl,-soname,$( basename $3 )"
fi
if [ "$PLATFORM" = "osx" ]; then
OPTIONS="$OPTIONS -Wl,-dylib_install_name -Wl,$( basename $3 )"
fi
else
OPTIONS=
OPTIONS+=" -shared"
if [ "$PLATFORM" = "linux" ]; then
OPTIONS+=" -Wl,-z,noexecstack -Wl,-soname,$( basename $3 )"
elif [ "$PLATFORM" = "osx" ]; then
OPTIONS+=" -Wl,-dylib_install_name -Wl,$( basename $3 )"
fi
fi
OPTIONS=$( eval echo "$OPTIONS $LDFLAGS \$$2_OBJECTS \$$2_LIBS \$$2_STATIC_LIBS_REAL \$$2_LDFLAGS -o $3" )
if echo $OPTIONS | grep -q -- -static-libstdc\+\+ ; then
OPTIONS=$( echo $OPTIONS | sed -e s,-static-libstdc\+\+,, -e s,\$,\ `$CC -print-file-name=libstdc++.a`\ -lm, )
if [[ "${OPTIONS}" == *"-static-libstdc++"* ]]; then
staticlibs=()
staticpaths=''
if [[ "${CC}" == *"gcc"* ]]; then
staticlibs+=('libstdc++.a')
elif [[ "${CXX}" == *"clang++"* ]]; then
staticlibs+=('libc++.a' 'libc++abi.a')
fi
for staticlib in "${staticlibs[@]}"; do
staticpaths+="$("${CC}" -print-file-name="${staticlib}") "
done
OPTIONS=$( echo $OPTIONS | sed -e s,-static-libstdc\+\+,, -e s,\$,\ "${staticpaths}"\ -lm, )
fi
case $PLATFORM in
osx)
if echo $OPTIONS | grep -q -- -static-libgcc ; then
if [[ "${OPTIONS}" == *"-static-libgcc"* ]]; then
$( $CC -### $OPTIONS 2>&1 | grep '^ ' | sed -e s,^\ ,, -e s,-lgcc[^\ ]*,,g -e s,\",,g -e s,\$,\ `$CC -print-file-name=libgcc_eh.a`, -e s,10.8.2,10.6, )
else
$CC $OPTIONS

View File

@ -4,7 +4,7 @@ set(ALLOC_INSTRUMENTATION OFF CACHE BOOL "Instrument alloc")
set(WITH_UNDODB OFF CACHE BOOL "Use rr or undodb")
set(USE_ASAN OFF CACHE BOOL "Compile with address sanitizer")
set(FDB_RELEASE OFF CACHE BOOL "This is a building of a final release")
set(USE_LD "LD" CACHE STRING "The linker to use for building: can be LD (system default, default choice), GOLD, or LLD")
set(USE_LD "DEFAULT" CACHE STRING "The linker to use for building: can be LD (system default, default choice), BFD, GOLD, or LLD")
set(USE_LIBCXX OFF CACHE BOOL "Use libc++")
set(USE_CCACHE OFF CACHE BOOL "Use ccache for compilation if available")
set(RELATIVE_DEBUG_PATHS OFF CACHE BOOL "Use relative file paths in debug info")
@ -89,9 +89,23 @@ else()
set(GCC YES)
endif()
# Use the linker environmental variable, if specified and valid
if ((USE_LD STREQUAL "DEFAULT") AND (NOT "$ENV{USE_LD}" STREQUAL ""))
string(TOUPPER "$ENV{USE_LD}" USE_LDENV)
if (("${USE_LDENV}" STREQUAL "LD") OR ("${USE_LDENV}" STREQUAL "GOLD") OR ("${USE_LDENV}" STREQUAL "LLD") OR ("${USE_LDENV}" STREQUAL "BFD") OR ("${USE_LDENV}" STREQUAL "DEFAULT"))
set(USE_LD "${USE_LDENV}")
else()
message (FATAL_ERROR "USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!")
endif()
endif()
# check linker flags.
if ((NOT (USE_LD STREQUAL "LD")) AND (NOT (USE_LD STREQUAL "GOLD")) AND (NOT (USE_LD STREQUAL "LLD")))
message (FATAL_ERROR "USE_LD must be set to LD, GOLD, or LLD!")
if (USE_LD STREQUAL "DEFAULT")
set(USE_LD "LD")
else()
if ((NOT (USE_LD STREQUAL "LD")) AND (NOT (USE_LD STREQUAL "GOLD")) AND (NOT (USE_LD STREQUAL "LLD")) AND (NOT (USE_LD STREQUAL "BFD")))
message (FATAL_ERROR "USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!")
endif()
endif()
# if USE_LD=LD, then we don't do anything, defaulting to whatever system
@ -99,6 +113,11 @@ else()
# implies the default xcode linker, and other distros may choose others by
# default).
if(USE_LD STREQUAL "BFD")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd -Wl,--disable-new-dtags")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd -Wl,--disable-new-dtags")
endif()
if(USE_LD STREQUAL "GOLD")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=gold -Wl,--disable-new-dtags")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=gold -Wl,--disable-new-dtags")

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.2.3
6.2.4
=====
Performance
@ -73,7 +73,8 @@ Bindings
* Go: The Go bindings now require Go version 1.11 or later.
* Go: Finalizers could run too early leading to undefined behavior. `(PR #1451) <https://github.com/apple/foundationdb/pull/1451>`_.
* Added a transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) <https://github.com/apple/foundationdb/pull/1844>`_.
* Added a transaction option to control the whether ``get_addresses_for_key`` includes a port in the address. This will be deprecated in api version 700, and addresses will include ports by default. `(PR #2060) <https://github.com/apple/foundationdb/pull/2060>`_.
* Added a transaction option to control the whether ``get_addresses_for_key`` includes a port in the address. This will be deprecated in api version 700, and addresses will include ports by default. [6.2.4] `(PR #2060) <https://github.com/apple/foundationdb/pull/2060>`_.
* Python: ``Versionstamp`` comparisons didn't work in Python 3. [6.2.4] `(PR #2089) <https://github.com/apple/foundationdb/pull/2089>`_.
Other Changes
-------------
@ -94,6 +95,7 @@ Other Changes
* Added a ``no_wait`` option to the ``fdbcli`` exclude command to avoid blocking. `(PR #1852) <https://github.com/apple/foundationdb/pull/1852>`_.
* Idle clusters will fsync much less frequently. `(PR #1697) <https://github.com/apple/foundationdb/pull/1697>`_.
* CMake is now the official build system. The Makefile based build system is deprecated.
* The incompatible client list in status (``cluster.incompatible_connections``) may now spuriously include clients that use the multi-version API to try connecting to the cluster at multiple versions.
Fixes only impacting 6.2.0+
---------------------------

View File

@ -155,7 +155,7 @@ struct OpenDatabaseCoordRequest {
UID knownClientInfoID;
Key clusterKey;
vector<NetworkAddress> coordinators;
ReplyPromise< struct ClientDBInfo > reply;
ReplyPromise< CachedSerialization<struct ClientDBInfo> > reply;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -609,11 +609,11 @@ ACTOR Future<Void> getClientInfoFromLeader( Reference<AsyncVar<Optional<ClusterC
} else {
resetReply(req);
}
req.knownClientInfoID = clientData->clientInfo->get().id;
req.knownClientInfoID = clientData->clientInfo->get().read().id;
choose {
when( ClientDBInfo ni = wait( brokenPromiseToNever( knownLeader->get().get().clientInterface.openDatabase.getReply( req ) ) ) ) {
TraceEvent("MonitorLeaderForProxiesGotClientInfo", knownLeader->get().get().clientInterface.id()).detail("Proxy0", ni.proxies.size() ? ni.proxies[0].id() : UID()).detail("ClientID", ni.id);
clientData->clientInfo->set(ni);
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(ni));
}
when( wait( knownLeader->onChange() ) ) {}
}
@ -649,7 +649,7 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
ClientDBInfo outInfo;
outInfo.id = deterministicRandom()->randomUniqueID();
outInfo.forward = leader.get().first.serializedInfo;
clientData->clientInfo->set(outInfo);
clientData->clientInfo->set(CachedSerialization<ClientDBInfo>(outInfo));
TraceEvent("MonitorLeaderForProxiesForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString());
return Void();
}
@ -709,11 +709,11 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
incorrectTime = Optional<double>();
}
state ErrorOr<ClientDBInfo> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) );
state ErrorOr<CachedSerialization<ClientDBInfo>> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) );
if (rep.present()) {
if( rep.get().forward.present() ) {
TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().forward.get().toString())));
if( rep.get().read().forward.present() ) {
TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().read().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString());
info.intermediateConnFile = Reference<ClusterConnectionFile>(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().read().forward.get().toString())));
return info;
}
if(connFile != info.intermediateConnFile) {
@ -729,7 +729,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
info.hasConnected = true;
connFile->notifyConnected();
auto& ni = rep.get();
auto& ni = rep.get().mutate();
if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) {
std::vector<UID> proxyUIDs;
for(auto& proxy : ni.proxies) {
@ -747,7 +747,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
ni.proxies = lastProxies;
}
clientInfo->set( rep.get() );
clientInfo->set( rep.get().read() );
successIdx = idx;
} else if(idx == successIdx) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));

View File

@ -42,11 +42,11 @@ struct ClientStatusInfo {
struct ClientData {
std::map<NetworkAddress, ClientStatusInfo> clientStatusInfoMap;
Reference<AsyncVar<ClientDBInfo>> clientInfo;
Reference<AsyncVar<CachedSerialization<ClientDBInfo>>> clientInfo;
OpenDatabaseRequest getRequest();
ClientData() : clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ) {}
ClientData() : clientInfo( new AsyncVar<CachedSerialization<ClientDBInfo>>( CachedSerialization<ClientDBInfo>() ) ) {}
};
template <class LeaderInterface>

View File

@ -89,7 +89,7 @@ class ClusterControllerData {
public:
struct DBInfo {
Reference<AsyncVar<ClientDBInfo>> clientInfo;
Reference<AsyncVar<ServerDBInfo>> serverInfo;
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> serverInfo;
ProcessIssuesMap workersWithIssues;
std::map<NetworkAddress, double> incompatibleConnections;
AsyncTrigger forceMasterFailure;
@ -105,34 +105,37 @@ public:
DBInfo() : masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false), unfinishedRecoveries(0), logGenerations(0),
clientInfo( new AsyncVar<ClientDBInfo>( ClientDBInfo() ) ),
serverInfo( new AsyncVar<ServerDBInfo>( ServerDBInfo() ) ),
serverInfo( new AsyncVar<CachedSerialization<ServerDBInfo>>( CachedSerialization<ServerDBInfo>() ) ),
db( DatabaseContext::create( clientInfo, Future<Void>(), LocalityData(), true, TaskPriority::DefaultEndpoint, true ) ) // SOMEDAY: Locality!
{
}
void setDistributor(const DataDistributorInterface& interf) {
ServerDBInfo newInfo = serverInfo->get();
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.distributor = interf;
serverInfo->set( newInfo );
serverInfo->set( newInfoCache );
}
void setRatekeeper(const RatekeeperInterface& interf) {
ServerDBInfo newInfo = serverInfo->get();
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
newInfo.id = deterministicRandom()->randomUniqueID();
newInfo.ratekeeper = interf;
serverInfo->set( newInfo );
serverInfo->set( newInfoCache );
}
void clearInterf(ProcessClass::ClassType t) {
ServerDBInfo newInfo = serverInfo->get();
CachedSerialization<ServerDBInfo> newInfoCache = serverInfo->get();
auto& newInfo = newInfoCache.mutate();
newInfo.id = deterministicRandom()->randomUniqueID();
if (t == ProcessClass::DataDistributorClass) {
newInfo.distributor = Optional<DataDistributorInterface>();
} else if (t == ProcessClass::RatekeeperClass) {
newInfo.ratekeeper = Optional<RatekeeperInterface>();
}
serverInfo->set( newInfo );
serverInfo->set( newInfoCache );
}
};
@ -444,8 +447,8 @@ public:
fitness = std::max(fitness, ProcessClass::ExcludeFit);
}
if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.details.interf.locality.dcId()==dcId ) {
if ((db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == it.first) ||
(db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == it.first)) {
if ((db.serverInfo->get().read().distributor.present() && db.serverInfo->get().read().distributor.get().locality.processId() == it.first) ||
(db.serverInfo->get().read().ratekeeper.present() && db.serverInfo->get().read().ratekeeper.get().locality.processId() == it.first)) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details);
} else {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].first.push_back(it.second.details);
@ -477,8 +480,8 @@ public:
auto fitness = it.second.details.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && it.second.details.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.details.interf.id() != minWorker.get().worker.interf.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
if ((db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == it.first) ||
(db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == it.first)) {
if ((db.serverInfo->get().read().distributor.present() && db.serverInfo->get().read().distributor.get().locality.processId() == it.first) ||
(db.serverInfo->get().read().ratekeeper.present() && db.serverInfo->get().read().ratekeeper.get().locality.processId() == it.first)) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details);
} else {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].first.push_back(it.second.details);
@ -877,7 +880,7 @@ public:
}
void checkRecoveryStalled() {
if( (db.serverInfo->get().recoveryState == RecoveryState::RECRUITING || db.serverInfo->get().recoveryState == RecoveryState::ACCEPTING_COMMITS || db.serverInfo->get().recoveryState == RecoveryState::ALL_LOGS_RECRUITED) && db.recoveryStalled ) {
if( (db.serverInfo->get().read().recoveryState == RecoveryState::RECRUITING || db.serverInfo->get().read().recoveryState == RecoveryState::ACCEPTING_COMMITS || db.serverInfo->get().read().recoveryState == RecoveryState::ALL_LOGS_RECRUITED) && db.recoveryStalled ) {
if (db.config.regions.size() > 1) {
auto regions = db.config.regions;
if(clusterControllerDcId.get() == regions[0].dcId) {
@ -891,7 +894,7 @@ public:
//FIXME: determine when to fail the cluster controller when a primaryDC has not been set
bool betterMasterExists() {
ServerDBInfo dbi = db.serverInfo->get();
const ServerDBInfo dbi = db.serverInfo->get().read();
if(dbi.recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return false;
@ -1086,7 +1089,7 @@ public:
ASSERT(masterProcessId.present());
if (processId == masterProcessId) return false;
auto& dbInfo = db.serverInfo->get();
auto& dbInfo = db.serverInfo->get().read();
for (const MasterProxyInterface& interf : dbInfo.client.proxies) {
if (interf.locality.processId() == processId) return true;
}
@ -1109,7 +1112,7 @@ public:
std::map<Optional<Standalone<StringRef>>, int> idUsed;
updateKnownIds(&idUsed);
auto& dbInfo = db.serverInfo->get();
auto& dbInfo = db.serverInfo->get().read();
for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) {
for (const auto& tlog: tlogset.tLogs) {
if (tlog.present()) {
@ -1164,12 +1167,13 @@ public:
gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0),
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false)
{
auto serverInfo = db.serverInfo->get();
CachedSerialization<ServerDBInfo> newInfoCache = db.serverInfo->get();
auto& serverInfo = newInfoCache.mutate();
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.masterLifetime.ccID = id;
serverInfo.clusterInterface = ccInterface;
serverInfo.myLocality = locality;
db.serverInfo->set( serverInfo );
db.serverInfo->set( newInfoCache );
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, true, true);
}
@ -1204,7 +1208,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
continue;
}
RecruitMasterRequest rmq;
rmq.lifetime = db->serverInfo->get().masterLifetime;
rmq.lifetime = db->serverInfo->get().read().masterLifetime;
rmq.forceRecovery = db->forceRecovery;
cluster->masterProcessId = masterWorker.worker.interf.locality.processId();
@ -1224,17 +1228,19 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
db->masterRegistrationCount = 0;
db->recoveryStalled = false;
auto dbInfo = ServerDBInfo();
auto cachedInfo = CachedSerialization<ServerDBInfo>();
auto& dbInfo = cachedInfo.mutate();
dbInfo.master = iMaster;
dbInfo.id = deterministicRandom()->randomUniqueID();
dbInfo.masterLifetime = db->serverInfo->get().masterLifetime;
dbInfo.masterLifetime = db->serverInfo->get().read().masterLifetime;
++dbInfo.masterLifetime;
dbInfo.clusterInterface = db->serverInfo->get().clusterInterface;
dbInfo.distributor = db->serverInfo->get().distributor;
dbInfo.ratekeeper = db->serverInfo->get().ratekeeper;
dbInfo.clusterInterface = db->serverInfo->get().read().clusterInterface;
dbInfo.distributor = db->serverInfo->get().read().distributor;
dbInfo.ratekeeper = db->serverInfo->get().read().ratekeeper;
TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id);
db->serverInfo->set( dbInfo );
db->serverInfo->set( cachedInfo );
state Future<Void> spinDelay = delay(SERVER_KNOBS->MASTER_SPIN_DELAY); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation
@ -1271,16 +1277,16 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
ACTOR Future<Void> clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID knownServerInfoID,
Standalone<VectorRef<StringRef>> issues,
std::vector<NetworkAddress> incompatiblePeers,
ReplyPromise<ServerDBInfo> reply) {
ReplyPromise<CachedSerialization<ServerDBInfo>> reply) {
state Optional<UID> issueID;
setIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID);
for(auto it : incompatiblePeers) {
db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL;
}
while (db->serverInfo->get().id == knownServerInfoID) {
while (db->serverInfo->get().read().id == knownServerInfoID) {
choose {
when (wait( db->serverInfo->onChange() )) {}
when (wait( yieldedFuture(db->serverInfo->onChange()) )) {}
when (wait( delayJittered( 300 ) )) { break; } // The server might be long gone!
}
}
@ -1372,11 +1378,11 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) {
}
void checkBetterDDOrRK(ClusterControllerData* self) {
if (!self->masterProcessId.present() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
if (!self->masterProcessId.present() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
return;
}
const ServerDBInfo& db = self->db.serverInfo->get();
auto& db = self->db.serverInfo->get().read();
auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::Ratekeeper);
auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor);
@ -1422,7 +1428,7 @@ ACTOR Future<Void> doCheckOutstandingRequests( ClusterControllerData* self ) {
self->checkRecoveryStalled();
if (self->betterMasterExists()) {
self->db.forceMasterFailure.trigger();
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id());
TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().read().master.id());
}
} catch( Error &e ) {
if(e.code() != error_code_operation_failed && e.code() != error_code_no_more_servers) {
@ -1719,8 +1725,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
//make sure the request comes from an active database
auto db = &self->db;
if ( db->serverInfo->get().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) {
TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount);
if ( db->serverInfo->get().read().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) {
TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().read().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount);
return;
}
@ -1753,7 +1759,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
}
bool isChanged = false;
auto dbInfo = self->db.serverInfo->get();
auto cachedInfo = self->db.serverInfo->get();
auto& dbInfo = cachedInfo.mutate();
if (dbInfo.recoveryState != req.recoveryState) {
dbInfo.recoveryState = req.recoveryState;
@ -1794,7 +1801,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
if( isChanged ) {
dbInfo.id = deterministicRandom()->randomUniqueID();
self->db.serverInfo->set( dbInfo );
self->db.serverInfo->set( cachedInfo );
}
checkOutstandingRequests(self);
@ -1855,7 +1862,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded );
if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().read().master.locality.processId()) {
self->masterProcessId = w.locality.processId();
}
checkOutstandingRequests( self );
@ -1879,7 +1886,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
TEST(true); // Received an old worker registration request.
}
if (req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() &&
if (req.distributorInterf.present() && !self->db.serverInfo->get().read().distributor.present() &&
self->clusterControllerDcId == req.distributorInterf.get().locality.dcId() &&
!self->recruitingDistributor) {
const DataDistributorInterface& di = req.distributorInterf.get();
@ -1896,7 +1903,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->id_worker[w.locality.processId()].haltRatekeeper = brokenPromiseToNever(req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
} else if(!self->recruitingRatekeeperID.present()) {
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper;
TraceEvent("CCRegisterRatekeeper", self->id).detail("RKID", rki.id());
if (ratekeeper.present() && ratekeeper.get().id() != rki.id() && self->id_worker.count(ratekeeper.get().locality.processId())) {
TraceEvent("CCHaltPreviousRatekeeper", self->id).detail("RKID", ratekeeper.get().id())
@ -2165,12 +2172,13 @@ ACTOR Future<Void> monitorServerInfoConfig(ClusterControllerData::DBInfo* db) {
config = LatencyBandConfig::parse(configVal.get());
}
ServerDBInfo serverInfo = db->serverInfo->get();
auto cachedInfo = db->serverInfo->get();
auto& serverInfo = cachedInfo.mutate();
if(config != serverInfo.latencyBandConfig) {
TraceEvent("LatencyBandConfigChanged").detail("Present", config.present());
serverInfo.id = deterministicRandom()->randomUniqueID();
serverInfo.latencyBandConfig = config;
db->serverInfo->set(serverInfo);
db->serverInfo->set(cachedInfo);
}
state Future<Void> configChangeFuture = tr.watch(latencyBandConfigKey);
@ -2324,7 +2332,7 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state double lastLogTime = 0;
loop {
self->versionDifferenceUpdated = false;
if(self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) {
if(self->db.serverInfo->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) {
bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE;
self->versionDifferenceUpdated = true;
self->datacenterVersionDifference = 0;
@ -2339,8 +2347,8 @@ ACTOR Future<Void> updateDatacenterVersionDifference( ClusterControllerData *sel
state Optional<TLogInterface> primaryLog;
state Optional<TLogInterface> remoteLog;
if(self->db.serverInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) {
if(self->db.serverInfo->get().read().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) {
for(auto& logSet : self->db.serverInfo->get().read().logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.locality != tagLocalitySatellite) {
for(auto& tLog : logSet.tLogs) {
if(tLog.present()) {
@ -2441,12 +2449,12 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
TraceEvent("CCStartDataDistributor", self->id);
loop {
try {
state bool no_distributor = !self->db.serverInfo->get().distributor.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
state bool no_distributor = !self->db.serverInfo->get().read().distributor.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().read().master.locality.processId() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (no_distributor && self->db.serverInfo->get().distributor.present()) {
return self->db.serverInfo->get().distributor.get();
if (no_distributor && self->db.serverInfo->get().read().distributor.present()) {
return self->db.serverInfo->get().read().distributor.get();
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
@ -2476,15 +2484,15 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
}
ACTOR Future<Void> monitorDataDistributor(ClusterControllerData *self) {
while(self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
while(self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
if ( self->db.serverInfo->get().distributor.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
if ( self->db.serverInfo->get().read().distributor.present() ) {
wait( waitFailureClient( self->db.serverInfo->get().read().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) );
TraceEvent("CCDataDistributorDied", self->id)
.detail("DistributorId", self->db.serverInfo->get().distributor.get().id());
.detail("DistributorId", self->db.serverInfo->get().read().distributor.get().id());
self->db.clearInterf(ProcessClass::DataDistributorClass);
} else {
self->recruitingDistributor = true;
@ -2501,11 +2509,11 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
TraceEvent("CCStartRatekeeper", self->id);
loop {
try {
state bool no_ratekeeper = !self->db.serverInfo->get().ratekeeper.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
state bool no_ratekeeper = !self->db.serverInfo->get().read().ratekeeper.present();
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().read().master.locality.processId() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
}
if (no_ratekeeper && self->db.serverInfo->get().ratekeeper.present()) {
if (no_ratekeeper && self->db.serverInfo->get().read().ratekeeper.present()) {
// Existing ratekeeper registers while waiting, so skip.
return Void();
}
@ -2525,7 +2533,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
if (interf.present()) {
self->recruitRatekeeper.set(false);
self->recruitingRatekeeperID = interf.get().id();
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper;
TraceEvent("CCRatekeeperRecruited", self->id).detail("Addr", worker.interf.address()).detail("RKID", interf.get().id());
if (ratekeeper.present() && ratekeeper.get().id() != interf.get().id() && self->id_worker.count(ratekeeper.get().locality.processId())) {
TraceEvent("CCHaltRatekeeperAfterRecruit", self->id).detail("RKID", ratekeeper.get().id())
@ -2550,16 +2558,16 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
}
ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
while(self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
while(self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
wait(self->db.serverInfo->onChange());
}
loop {
if ( self->db.serverInfo->get().ratekeeper.present() && !self->recruitRatekeeper.get() ) {
if ( self->db.serverInfo->get().read().ratekeeper.present() && !self->recruitRatekeeper.get() ) {
choose {
when(wait(waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) {
when(wait(waitFailureClient( self->db.serverInfo->get().read().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) {
TraceEvent("CCRatekeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
.detail("RKID", self->db.serverInfo->get().read().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RatekeeperClass);
}
when(wait(self->recruitRatekeeper.onChange())) {}

View File

@ -209,7 +209,7 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") {
}
ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<AsyncVar<bool>> hasConnectedClients, OpenDatabaseCoordRequest req) {
if(db->clientInfo->get().id != req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
if(db->clientInfo->get().read().id != req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) {
req.reply.send( db->clientInfo->get() );
return Void();
}
@ -218,9 +218,9 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
db->clientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup, req.supportedVersions, req.issues);
while (db->clientInfo->get().id == req.knownClientInfoID && !db->clientInfo->get().forward.present()) {
while (db->clientInfo->get().read().id == req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) {
choose {
when (wait( db->clientInfo->onChange() )) {}
when (wait( yieldedFuture(db->clientInfo->onChange()) )) {}
when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; } // The client might be long gone!
}
}
@ -315,10 +315,12 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
ClientDBInfo outInfo;
outInfo.id = deterministicRandom()->randomUniqueID();
outInfo.forward = req.conn.toString();
clientData.clientInfo->set(outInfo);
clientData.clientInfo->set(CachedSerialization<ClientDBInfo>(outInfo));
req.reply.send( Void() );
ASSERT(!hasConnectedClients->get());
return Void();
if(!hasConnectedClients->get()) {
return Void();
}
nextInterval = Future<Void>();
}
when ( wait(nextInterval.isValid() ? nextInterval : Never()) ) {
if (!availableLeaders.size() && !availableCandidates.size() && !notify.size() &&
@ -478,7 +480,7 @@ ACTOR Future<Void> leaderServer(LeaderElectionRegInterface interf, OnDemandStore
ClientDBInfo info;
info.id = deterministicRandom()->randomUniqueID();
info.forward = forward.get().serializedInfo;
req.reply.send( info );
req.reply.send( CachedSerialization<ClientDBInfo>(info) );
} else {
regs.getInterface(req.clusterKey, id).openDatabase.send( req );
}

View File

@ -1810,7 +1810,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// The desired machine team number is not the same with the desired server team number
// in notEnoughTeamsForAServer() below, because the machineTeamRemover() does not
// remove a machine team with the most number of machine teams.
if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine) {
if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine && isMachineHealthy(m.second)) {
return true;
}
}
@ -1831,7 +1831,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2;
ASSERT(targetTeamNumPerServer > 0);
for (auto& s : server_info) {
if (s.second->teams.size() < targetTeamNumPerServer) {
if (s.second->teams.size() < targetTeamNumPerServer && !server_status.get(s.first).isUnhealthy()) {
return true;
}
}

View File

@ -67,7 +67,7 @@ struct GetServerDBInfoRequest {
UID knownServerInfoID;
Standalone<VectorRef<StringRef>> issues;
std::vector<NetworkAddress> incompatiblePeers;
ReplyPromise< struct ServerDBInfo > reply;
ReplyPromise< CachedSerialization<struct ServerDBInfo> > reply;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -566,7 +566,7 @@ struct RolesInfo {
};
ACTOR static Future<JsonBuilderObject> processStatusFetcher(
Reference<AsyncVar<struct ServerDBInfo>> db, std::vector<WorkerDetails> workers, WorkerEvents pMetrics,
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::vector<WorkerDetails> workers, WorkerEvents pMetrics,
WorkerEvents mMetrics, WorkerEvents nMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts, std::map<std::string, std::vector<JsonBuilderObject>> processIssues,
vector<std::pair<StorageServerInterface, EventMap>> storageServers,
@ -624,18 +624,18 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
state RolesInfo roles;
roles.addRole("master", db->get().master);
roles.addRole("cluster_controller", db->get().clusterInterface.clientInterface);
roles.addRole("master", db->get().read().master);
roles.addRole("cluster_controller", db->get().read().clusterInterface.clientInterface);
if (db->get().distributor.present()) {
roles.addRole("data_distributor", db->get().distributor.get());
if (db->get().read().distributor.present()) {
roles.addRole("data_distributor", db->get().read().distributor.get());
}
if (db->get().ratekeeper.present()) {
roles.addRole("ratekeeper", db->get().ratekeeper.get());
if (db->get().read().ratekeeper.present()) {
roles.addRole("ratekeeper", db->get().read().ratekeeper.get());
}
for(auto& tLogSet : db->get().logSystemConfig.tLogs) {
for(auto& tLogSet : db->get().read().logSystemConfig.tLogs) {
for(auto& it : tLogSet.logRouters) {
if(it.present()) {
roles.addRole("router", it.interf());
@ -643,7 +643,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
}
for(auto& old : db->get().logSystemConfig.oldTLogs) {
for(auto& old : db->get().read().logSystemConfig.oldTLogs) {
for(auto& tLogSet : old.tLogs) {
for(auto& it : tLogSet.logRouters) {
if(it.present()) {
@ -686,7 +686,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
state std::vector<ResolverInterface>::const_iterator res;
state std::vector<ResolverInterface> resolvers = db->get().resolvers;
state std::vector<ResolverInterface> resolvers = db->get().read().resolvers;
for(res = resolvers.begin(); res != resolvers.end(); ++res) {
roles.addRole( "resolver", *res );
wait(yield());
@ -1464,8 +1464,8 @@ ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStor
return results;
}
ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetrics(Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<TLogInterface> servers = db->get().logSystemConfig.allPresentLogs();
ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetrics(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<TLogInterface> servers = db->get().read().logSystemConfig.allPresentLogs();
vector<std::pair<TLogInterface, EventMap>> results =
wait(getServerMetrics(servers, address_workers, std::vector<std::string>{ "TLogMetrics" }));
@ -1550,7 +1550,7 @@ JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transP
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<WorkerDetails> workers, WorkerDetails mWorker, WorkerDetails rkWorker,
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, vector<WorkerDetails> workers, WorkerDetails mWorker, WorkerDetails rkWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
state JsonBuilderObject statusObj;
@ -1565,7 +1565,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
for (auto const& w : workers) {
workersMap[w.interf.address()] = w;
}
for (auto &p : db->get().client.proxies) {
for (auto &p : db->get().read().client.proxies) {
auto worker = getWorker(workersMap, p.address());
if (worker.present())
proxyStatFutures.push_back(timeoutError(worker.get().interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("ProxyMetrics"))), 1.0));
@ -1772,11 +1772,11 @@ ACTOR static Future<JsonBuilderObject> clusterSummaryStatisticsFetcher(WorkerEve
return statusObj;
}
static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, std::unordered_map<NetworkAddress, WorkerInterface> const& address_workers) {
JsonBuilderArray oldTlogsArray;
if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
for(auto it : db->get().logSystemConfig.oldTLogs) {
if(db->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS) {
for(auto it : db->get().read().logSystemConfig.oldTLogs) {
JsonBuilderObject statusObj;
JsonBuilderArray logsObj;
Optional<int32_t> sat_log_replication_factor, sat_log_write_anti_quorum, sat_log_fault_tolerance, log_replication_factor, log_write_anti_quorum, log_fault_tolerance, remote_log_replication_factor, remote_log_fault_tolerance;
@ -2022,7 +2022,7 @@ ACTOR Future<JsonBuilderObject> layerStatusFetcher(Database cx, JsonBuilderArray
return statusObj;
}
ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons) {
ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, JsonBuilderArray *messages, std::set<std::string> *incomplete_reasons) {
state JsonBuilderObject statusObj;
state Database cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, false); // Open a new database connection that isn't lock-aware
@ -2063,7 +2063,7 @@ ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<struct Se
// constructs the cluster section of the json status output
ACTOR Future<StatusReply> clusterGetStatus(
Reference<AsyncVar<struct ServerDBInfo>> db,
Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db,
Database cx,
vector<WorkerDetails> workers,
ProcessIssuesMap workerIssues,
@ -2083,7 +2083,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
try {
// Get the master Worker interface
Optional<WorkerDetails> _mWorker = getWorker( workers, db->get().master.address() );
Optional<WorkerDetails> _mWorker = getWorker( workers, db->get().read().master.address() );
if (_mWorker.present()) {
mWorker = _mWorker.get();
} else {
@ -2091,11 +2091,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// Get the DataDistributor worker interface
Optional<WorkerDetails> _ddWorker;
if (db->get().distributor.present()) {
_ddWorker = getWorker( workers, db->get().distributor.get().address() );
if (db->get().read().distributor.present()) {
_ddWorker = getWorker( workers, db->get().read().distributor.get().address() );
}
if (!db->get().distributor.present() || !_ddWorker.present()) {
if (!db->get().read().distributor.present() || !_ddWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_dataDistributor_worker", "Unable to locate the data distributor worker."));
} else {
ddWorker = _ddWorker.get();
@ -2103,11 +2103,11 @@ ACTOR Future<StatusReply> clusterGetStatus(
// Get the Ratekeeper worker interface
Optional<WorkerDetails> _rkWorker;
if (db->get().ratekeeper.present()) {
_rkWorker = getWorker( workers, db->get().ratekeeper.get().address() );
if (db->get().read().ratekeeper.present()) {
_rkWorker = getWorker( workers, db->get().read().ratekeeper.get().address() );
}
if (!db->get().ratekeeper.present() || !_rkWorker.present()) {
if (!db->get().read().ratekeeper.present() || !_rkWorker.present()) {
messages.push_back(JsonString::makeMessage("unreachable_ratekeeper_worker", "Unable to locate the ratekeeper worker."));
} else {
rkWorker = _rkWorker.get();
@ -2165,8 +2165,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
state WorkerEvents programStarts = workerEventsVec[5].present() ? workerEventsVec[5].get().first : WorkerEvents();
state JsonBuilderObject statusObj;
if(db->get().recoveryCount > 0) {
statusObj["generation"] = db->get().recoveryCount;
if(db->get().read().recoveryCount > 0) {
statusObj["generation"] = db->get().read().recoveryCount;
}
state std::map<std::string, std::vector<JsonBuilderObject>> processIssues =
@ -2242,7 +2242,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::vector<JsonBuilderObject> workerStatuses = wait(getAll(futures2));
int oldLogFaultTolerance = 100;
if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().logSystemConfig.oldTLogs.size() > 0) {
if(db->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().read().logSystemConfig.oldTLogs.size() > 0) {
statusObj["old_logs"] = oldTlogFetcher(&oldLogFaultTolerance, db, address_workers);
}

View File

@ -34,7 +34,7 @@ void setIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, VectorRef
void removeIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, Optional<UID>& issueID);
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<WorkerDetails> const& workers,
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<CachedSerialization<struct ServerDBInfo>>> const& db, Database const& cx, vector<WorkerDetails> const& workers,
ProcessIssuesMap const& workerIssues, std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>>* const& clientStatus, ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );
#endif

View File

@ -51,9 +51,9 @@ struct TLogInterface {
TLogInterface() {}
explicit TLogInterface(LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
TLogInterface(UID sharedTLogID, LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {}
TLogInterface(UID uniqueID, UID sharedTLogID, LocalityData locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {}
explicit TLogInterface(const LocalityData& locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
TLogInterface(UID sharedTLogID, const LocalityData& locality) : uniqueID( deterministicRandom()->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {}
TLogInterface(UID uniqueID, UID sharedTLogID, const LocalityData& locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {}
UID id() const { return uniqueID; }
UID getSharedTLogID() const { return sharedTLogID; }
std::string toString() const { return id().shortString(); }

View File

@ -434,6 +434,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
Version queuePoppedVersion;
Version minPoppedTagVersion;
Tag minPoppedTag;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
@ -498,6 +500,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
minPoppedTagVersion(0), minPoppedTag(invalidTag),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
@ -515,6 +518,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; });
specialCounter(cc, "MinPoppedTagVersion", [this](){ return this->minPoppedTagVersion; });
specialCounter(cc, "MinPoppedTag", [this](){ return this->minPoppedTag.toString(); });
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
@ -733,12 +738,20 @@ ACTOR Future<Void> popDiskQueue( TLogData* self, Reference<LogData> logData ) {
minLocation = locationIter->value.first;
minVersion = locationIter->key;
}
logData->minPoppedTagVersion = std::numeric_limits<Version>::max();
for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
if (tagData && tagData->tag.locality != tagLocalityTxs && tagData->tag != txsTag && !tagData->nothingPersistent) {
minLocation = std::min(minLocation, tagData->poppedLocation);
minVersion = std::min(minVersion, tagData->popped);
if (tagData && tagData->tag.locality != tagLocalityTxs && tagData->tag != txsTag) {
if(!tagData->nothingPersistent) {
minLocation = std::min(minLocation, tagData->poppedLocation);
minVersion = std::min(minVersion, tagData->popped);
}
if((!tagData->nothingPersistent || tagData->versionMessages.size()) && tagData->popped < logData->minPoppedTagVersion) {
logData->minPoppedTagVersion = tagData->popped;
logData->minPoppedTag = tagData->tag;
}
}
}
}

View File

@ -413,6 +413,7 @@ void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error
struct ServerDBInfo;
class Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, bool enableLocalityLoadBalance = true, bool lockAware = false );
class Database openDBOnServer( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, bool enableLocalityLoadBalance = true, bool lockAware = false );
ACTOR Future<Void> extractClusterInterface(Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> a,
Reference<AsyncVar<Optional<struct ClusterInterface>>> b);

View File

@ -545,6 +545,10 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything( Refere
if (!self->configuration.isValid()) {
RecoveryStatus::RecoveryStatus status;
if (self->configuration.initialized) {
TraceEvent(SevWarn, "MasterRecoveryInvalidConfiguration", self->dbgid)
.setMaxEventLength(11000)
.setMaxFieldLength(10000)
.detail("Conf", self->configuration.toString());
status = RecoveryStatus::configuration_invalid;
} else if (!self->cstate.prevDBState.tLogs.size()) {
status = RecoveryStatus::configuration_never_created;
@ -665,7 +669,12 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
self->configuration.fromKeyValues( rawConf );
self->originalConfiguration = self->configuration;
self->hasConfiguration = true;
TraceEvent("MasterRecoveredConfig", self->dbgid).detail("Conf", self->configuration.toString()).trackLatest("RecoveredConfig");
TraceEvent("MasterRecoveredConfig", self->dbgid)
.setMaxEventLength(11000)
.setMaxFieldLength(10000)
.detail("Conf", self->configuration.toString())
.trackLatest("RecoveredConfig");
Standalone<VectorRef<KeyValueRef>> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) );
self->dcId_locality.clear();
@ -797,7 +806,10 @@ void updateConfigForForcedRecovery(Reference<MasterData> self, vector<Standalone
regionJSON["regions"] = self->configuration.getRegionJSON();
regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion()).toString()));
self->configuration.applyMutation( regionCommit.mutations.back() ); //modifying the configuration directly does not change the configuration when it is re-serialized unless we call applyMutation
TraceEvent("ForcedRecoveryConfigChange", self->dbgid).detail("Conf", self->configuration.toString());
TraceEvent("ForcedRecoveryConfigChange", self->dbgid)
.setMaxEventLength(11000)
.setMaxFieldLength(10000)
.detail("Conf", self->configuration.toString());
}
initialConfChanges->push_back(regionCommit);
}

View File

@ -465,9 +465,10 @@ public:
struct Counters {
CounterCollection cc;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries;
Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries, emptyQueries;
Counter bytesInput, bytesDurable, bytesFetched,
mutationBytes; // Like bytesInput but without MVCC accounting
Counter sampledBytesCleared;
Counter mutations, setMutations, clearRangeMutations, atomicMutations;
Counter updateBatches, updateVersions;
Counter loops;
@ -486,10 +487,12 @@ public:
rowsQueried("RowsQueried", cc),
bytesQueried("BytesQueried", cc),
watchQueries("WatchQueries", cc),
emptyQueries("EmptyQueries", cc),
bytesInput("BytesInput", cc),
bytesDurable("BytesDurable", cc),
bytesFetched("BytesFetched", cc),
mutationBytes("MutationBytes", cc),
sampledBytesCleared("SampledBytesCleared", cc),
mutations("Mutations", cc),
setMutations("SetMutations", cc),
clearRangeMutations("ClearRangeMutations", cc),
@ -881,6 +884,9 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
resultSize = v.get().size();
data->counters.bytesQueried += resultSize;
}
else {
++data->counters.emptyQueries;
}
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
@ -1439,6 +1445,9 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
resultSize = req.limitBytes - remainingLimitBytes;
data->counters.bytesQueried += resultSize;
data->counters.rowsQueried += r.data.size();
if(r.data.size() == 0) {
++data->counters.emptyQueries;
}
}
} catch (Error& e) {
if(!canReplyWith(e))
@ -3269,6 +3278,8 @@ void StorageServer::byteSampleApplyClear( KeyRangeRef range, Version ver ) {
if(range.begin < allKeys.end) {
//NotifyBytes should not be called for keys past allKeys.end
KeyRangeRef searchRange = KeyRangeRef(range.begin, std::min(range.end, allKeys.end));
counters.sampledBytesCleared += byteSample.sumRange(searchRange.begin, searchRange.end);
auto r = metrics.waitMetricsMap.intersectingRanges(searchRange);
for(auto shard = r.begin(); shard != r.end(); ++shard) {
KeyRangeRef intersectingRange = shard.range() & range;

View File

@ -75,11 +75,23 @@ ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<ServerDBInfo>> d
}
}
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
loop {
info->set( db->get().read().client );
wait( db->onChange() );
}
}
Database openDBOnServer( Reference<AsyncVar<ServerDBInfo>> const& db, TaskPriority taskID, bool enableLocalityLoadBalance, bool lockAware ) {
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
return DatabaseContext::create( info, extractClientInfo(db, info), enableLocalityLoadBalance ? db->get().myLocality : LocalityData(), enableLocalityLoadBalance, taskID, lockAware );
}
Database openDBOnServer( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> const& db, TaskPriority taskID, bool enableLocalityLoadBalance, bool lockAware ) {
Reference<AsyncVar<ClientDBInfo>> info( new AsyncVar<ClientDBInfo> );
return DatabaseContext::create( info, extractClientInfo(db, info), enableLocalityLoadBalance ? db->get().read().myLocality : LocalityData(), enableLocalityLoadBalance, taskID, lockAware );
}
struct ErrorInfo {
Error error;
const Role &role;
@ -725,11 +737,12 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
}
choose {
when( ServerDBInfo ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
TraceEvent("GotServerDBInfoChange").detail("ChangeID", ni.id).detail("MasterID", ni.master.id())
.detail("RatekeeperID", ni.ratekeeper.present() ? ni.ratekeeper.get().id() : UID())
.detail("DataDistributorID", ni.distributor.present() ? ni.distributor.get().id() : UID());
ServerDBInfo localInfo = ni;
when( CachedSerialization<ServerDBInfo> ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
ServerDBInfo localInfo = ni.read();
TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id())
.detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID())
.detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID());
localInfo.myLocality = locality;
dbInfo->set(localInfo);
}

View File

@ -96,6 +96,12 @@ struct serializable_traits : std::false_type {
static void serialize(Archiver& ar, T& v);
};
template <class T>
struct serialize_raw : std::false_type {
template <class Context>
static uint8_t* save_raw(Context& context, const T& obj);
};
template <class VectorLike>
struct vector_like_traits : std::false_type {
// Write this at the beginning of the buffer

View File

@ -421,7 +421,8 @@ TEST_CASE("/flow/FlatBuffers/file_identifier") {
TestContext context{arena};
const uint8_t* out;
constexpr FileIdentifier file_identifier{ 1234 };
out = save_members(context, file_identifier);
Y1 y1;
out = save_members(context, file_identifier, y1);
// print_buffer(out, arena.get_size(out));
ASSERT(read_file_identifier(out) == file_identifier);
return Void();

View File

@ -1110,7 +1110,7 @@ uint8_t* save(Context& context, const Root& root, FileIdentifier file_identifier
save_with_vtables(root, vtableset, precompute_size, &vtable_start, file_identifier, context);
uint8_t* out = context.allocate(precompute_size.current_buffer_size);
WriteToBuffer writeToBuffer{ context, precompute_size.current_buffer_size, vtable_start, out,
precompute_size.writeToOffsets.begin() };
precompute_size.writeToOffsets.begin() };
save_with_vtables(root, vtableset, writeToBuffer, &vtable_start, file_identifier, context);
return out;
}
@ -1122,10 +1122,15 @@ void load(Root& root, const uint8_t* in, Context& context) {
} // namespace detail
template <class Context, class... Members>
uint8_t* save_members(Context& context, FileIdentifier file_identifier, Members&... members) {
const auto& root = detail::fake_root(members...);
return detail::save(context, root, file_identifier);
template <class Context, class FirstMember, class... Members>
uint8_t* save_members(Context& context, FileIdentifier file_identifier, const FirstMember& first,
const Members&... members) {
if constexpr (serialize_raw<FirstMember>::value) {
return serialize_raw<FirstMember>::save_raw(context, first);
} else {
const auto& root = detail::fake_root(const_cast<FirstMember&>(first), const_cast<Members&>(members)...);
return detail::save(context, root, file_identifier);
}
}
template <class Context, class... Members>
@ -1158,6 +1163,7 @@ struct EnsureTable
template <class Archive>
void serialize(Archive& ar) {
if constexpr (is_fb_function<Archive>) {
// This is only for vtable collection. Load and save use the LoadSaveHelper specialization below
if constexpr (detail::expect_serialize_member<T>) {
if constexpr (serializable_traits<T>::value) {
serializable_traits<T>::serialize(ar, t);
@ -1172,7 +1178,41 @@ struct EnsureTable
}
}
T& asUnderlyingType() { return t; }
const T& asUnderlyingType() const { return t; }
private:
T t;
};
namespace detail {
// Ensure if there's a LoadSaveHelper specialization available for T it gets used.
template <class T, class Context>
struct LoadSaveHelper<EnsureTable<T>, Context> : Context {
LoadSaveHelper(const Context& context) : Context(context), alreadyATable(context), wrapInTable(context) {}
void load(EnsureTable<T>& member, const uint8_t* current) {
if constexpr (expect_serialize_member<T>) {
alreadyATable.load(member.asUnderlyingType(), current);
} else {
FakeRoot<T> t{ member.asUnderlyingType() };
wrapInTable.load(t, current);
}
}
template <class Writer>
RelativeOffset save(const EnsureTable<T>& member, Writer& writer, const VTableSet* vtables) {
if constexpr (expect_serialize_member<T>) {
return alreadyATable.save(member.asUnderlyingType(), writer, vtables);
} else {
FakeRoot<T> t{ const_cast<T&>(member.asUnderlyingType()) };
return wrapInTable.save(t, writer, vtables);
}
}
private:
LoadSaveHelper<T, Context> alreadyATable;
LoadSaveHelper<FakeRoot<T>, Context> wrapInTable;
};
} // namespace detail

View File

@ -235,6 +235,106 @@ struct union_like_traits<ErrorOr<T>> : std::true_type {
}
};
template <class T>
class CachedSerialization {
public:
constexpr static FileIdentifier file_identifier = FileIdentifierFor<T>::value;
//FIXME: this code will not work for caching a direct serialization from ObjectWriter, because it adds an ErrorOr,
// we should create a separate SerializeType for direct serialization
enum class SerializeType { None, Binary, Object };
CachedSerialization() : cacheType(SerializeType::None) {}
explicit CachedSerialization(const T& data) : data(data), cacheType(SerializeType::None) {}
const T& read() const { return data; }
T& mutate() {
cacheType = SerializeType::None;
return data;
}
//This should only be called from the ObjectSerializer load function
Standalone<StringRef> getCache() const {
if(cacheType != SerializeType::Object) {
cache = ObjectWriter::toValue(ErrorOr<EnsureTable<T>>(data), AssumeVersion(currentProtocolVersion));
cacheType = SerializeType::Object;
}
return cache;
}
bool operator == (CachedSerialization<T> const& rhs) const {
return data == rhs.data;
}
bool operator != (CachedSerialization<T> const& rhs) const {
return !(*this == rhs);
}
bool operator < (CachedSerialization<T> const& rhs) const {
return data < rhs.data;
}
template <class Ar>
void serialize(Ar& ar) {
if constexpr (is_fb_function<Ar>) {
// Suppress vtable collection. Save and load are implemented via the specializations below
} else {
if (Ar::isDeserializing) {
cache = Standalone<StringRef>();
cacheType = SerializeType::None;
serializer(ar, data);
} else {
if (cacheType != SerializeType::Binary) {
cache = BinaryWriter::toValue(data, AssumeVersion(currentProtocolVersion));
cacheType = SerializeType::Binary;
}
ar.serializeBytes(const_cast<uint8_t*>(cache.begin()), cache.size());
}
}
}
private:
T data;
mutable SerializeType cacheType;
mutable Standalone<StringRef> cache;
};
// this special case is needed - the code expects
// Standalone<T> and T to be equivalent for serialization
namespace detail {
template <class T, class Context>
struct LoadSaveHelper<CachedSerialization<T>, Context> : Context {
LoadSaveHelper(const Context& context)
: Context(context), helper(context) {}
void load(CachedSerialization<T>& member, const uint8_t* current) {
helper.load(member.mutate(), current);
}
template <class Writer>
RelativeOffset save(const CachedSerialization<T>& member, Writer& writer, const VTableSet* vtables) {
throw internal_error();
}
private:
LoadSaveHelper<T, Context> helper;
};
} // namespace detail
template <class V>
struct serialize_raw<ErrorOr<EnsureTable<CachedSerialization<V>>>> : std::true_type {
template <class Context>
static uint8_t* save_raw(Context& context, const ErrorOr<EnsureTable<CachedSerialization<V>>>& obj) {
auto cache = obj.present() ? obj.get().asUnderlyingType().getCache()
: ObjectWriter::toValue(ErrorOr<EnsureTable<V>>(obj.getError()),
AssumeVersion(currentProtocolVersion));
uint8_t* out = context.allocate(cache.size());
memcpy(out, cache.begin(), cache.size());
return out;
}
};
template <class T>
struct Callback {
Callback<T> *prev, *next;