diff --git a/CMakeLists.txt b/CMakeLists.txt index a1aa4c7335..ebfd0e5224 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ # limitations under the License. cmake_minimum_required(VERSION 3.12) project(foundationdb - VERSION 6.2.0 + VERSION 6.2.4 DESCRIPTION "FoundationDB is a scalable, fault-tolerant, ordered key-value store with full ACID transactions." HOMEPAGE_URL "http://www.foundationdb.org/" LANGUAGES C CXX ASM) @@ -75,8 +75,7 @@ message(STATUS "Current git version ${CURRENT_GIT_VERSION}") # Version information ################################################################################ -set(USE_VERSIONS_TARGET OFF CACHE BOOL "Use the deprecated versions.target file") -if(USE_VERSIONS_TARGET) +if(NOT WIN32) add_custom_target(version_file ALL DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/versions.target) execute_process( COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/build/get_version.sh ${CMAKE_CURRENT_SOURCE_DIR}/versions.target @@ -84,8 +83,17 @@ if(USE_VERSIONS_TARGET) execute_process( COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/build/get_package_name.sh ${CMAKE_CURRENT_SOURCE_DIR}/versions.target OUTPUT_VARIABLE FDB_PACKAGE_NAME_WNL) - string(STRIP "${FDB_VERSION_WNL}" FDB_VERSION) - string(STRIP "${FDB_PACKAGE_NAME_WNL}" FDB_PACKAGE_NAME) + string(STRIP "${FDB_VERSION_WNL}" FDB_VERSION_TARGET_FILE) + string(STRIP "${FDB_PACKAGE_NAME_WNL}" FDB_PACKAGE_NAME_TARGET_FILE) +endif() + +set(USE_VERSIONS_TARGET OFF CACHE BOOL "Use the deprecated versions.target file") +if(USE_VERSIONS_TARGET) + if (WIN32) + message(FATAL_ERROR "USE_VERSION_TARGET us not supported on Windows") + endif() + set(FDB_VERSION ${FDB_VERION_TARGET_FILE}) + set(FDB_PACKAGE_NAME ${FDB_PACKAGE_NAME_TARGET_FILE}) set(FDB_VERSION_PLAIN ${FDB_VERSION}) if(NOT FDB_RELEASE) set(FDB_VERSION "${FDB_VERSION}-PRERELEASE") @@ -94,6 +102,17 @@ else() set(FDB_PACKAGE_NAME "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}") set(FDB_VERSION ${PROJECT_VERSION}) set(FDB_VERSION_PLAIN ${FDB_VERSION}) + if(NOT WIN32) + # we need to assert that the cmake version is in sync with the target version + if(NOT (FDB_VERSION STREQUAL FDB_VERSION_TARGET_FILE)) + message(SEND_ERROR "The project version in cmake is set to ${FDB_VERSION},\ + but versions.target has it at ${FDB_VERSION_TARGET_FILE}") + endif() + if(NOT (FDB_PACKAGE_NAME STREQUAL FDB_PACKAGE_NAME_TARGET_FILE)) + message(SEND_ERROR "The package name in cmake is set to ${FDB_PACKAGE_NAME},\ + but versions.target has it set to ${FDB_PACKAGE_NAME_TARGET_FILE}") + endif() + endif() endif() message(STATUS "FDB version is ${FDB_VERSION}") diff --git a/bindings/c/local.mk b/bindings/c/local.mk index df2859c6a3..424291c457 100644 --- a/bindings/c/local.mk +++ b/bindings/c/local.mk @@ -92,11 +92,11 @@ bindings/c/foundationdb/fdb_c_options.g.h: bin/vexillographer.exe fdbclient/vexi bin/fdb_c_performance_test: bindings/c/test/performance_test.c bindings/c/test/test.h fdb_c @echo "Compiling fdb_c_performance_test" - @$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ bindings/c/test/performance_test.c $(fdb_c_tests_LIBS) + @$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ -c bindings/c/test/performance_test.c bin/fdb_c_ryw_benchmark: bindings/c/test/ryw_benchmark.c bindings/c/test/test.h fdb_c @echo "Compiling fdb_c_ryw_benchmark" - @$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ bindings/c/test/ryw_benchmark.c $(fdb_c_tests_LIBS) + @$(CC) $(CFLAGS) $(fdb_c_tests_HEADERS) -o $@ -c bindings/c/test/ryw_benchmark.c packages/fdb-c-tests-$(VERSION)-$(PLATFORM).tar.gz: bin/fdb_c_performance_test bin/fdb_c_ryw_benchmark @echo "Packaging $@" diff --git a/bindings/python/fdb/tuple.py b/bindings/python/fdb/tuple.py index 30748ae0eb..379e576cdc 100644 --- a/bindings/python/fdb/tuple.py +++ b/bindings/python/fdb/tuple.py @@ -25,6 +25,7 @@ import uuid import struct import math import sys +import functools from bisect import bisect_left from fdb import six @@ -72,6 +73,7 @@ def _float_adjust(v, encode): return six.int2byte(six.indexbytes(v, 0) ^ 0x80) + v[1:] +@functools.total_ordering class SingleFloat(object): def __init__(self, value): if isinstance(value, float): @@ -91,21 +93,9 @@ class SingleFloat(object): else: return False - def __ne__(self, other): - return not (self == other) - def __lt__(self, other): return _compare_floats(self.value, other.value) < 0 - def __le__(self, other): - return _compare_floats(self.value, other.value) <= 0 - - def __gt__(self, other): - return not (self <= other) - - def __ge__(self, other): - return not (self < other) - def __str__(self): return str(self.value) @@ -124,6 +114,7 @@ class SingleFloat(object): return bool(self.value) +@functools.total_ordering class Versionstamp(object): LENGTH = 12 _TR_VERSION_LEN = 10 @@ -200,25 +191,22 @@ class Versionstamp(object): else: return False - def __ne__(self, other): - return not (self == other) - - def __cmp__(self, other): + def __lt__(self, other): if self.is_complete(): if other.is_complete(): if self.tr_version == other.tr_version: - return cmp(self.user_version, other.user_version) + return self.user_version < other.user_version else: - return cmp(self.tr_version, other.tr_version) + return self.tr_version < other.tr_version else: # All complete are less than all incomplete. - return -1 + return True else: if other.is_complete(): # All incomplete are greater than all complete - return 1 + return False else: - return cmp(self.user_version, other.user_version) + return self.user_version < other.user_version def __hash__(self): if self.tr_version is None: diff --git a/build/link-wrapper.sh b/build/link-wrapper.sh index 5d24fc83d5..184d0393b1 100755 --- a/build/link-wrapper.sh +++ b/build/link-wrapper.sh @@ -1,32 +1,65 @@ #!/bin/bash set -e +OPTIONS='' + +# Get compiler version and major version +COMPILER_VER=$("${CC}" -dumpversion) +COMPILER_MAJVER="${COMPILER_VER%%\.*}" + +# Add linker, if specified and valid +# The linker to use for building: +# can be LD (system default, default choice), GOLD, LLD, or BFD +if [ -n "${USE_LD}" ] && \ + (([[ "${CC}" == *"gcc"* ]] && [ "${COMPILER_MAJVER}" -ge 9 ]) || \ + ([[ "${CXX}" == *"clang++"* ]] && [ "${COMPILER_MAJVER}" -ge 4 ]) ) +then + if [ "${PLATFORM}" == "linux" ]; then + if [ "${USE_LD}" == "BFD" ]; then + OPTIONS+='-fuse-ld=bfd -Wl,--disable-new-dtags' + elif [ "${USE_LD}" == "GOLD" ]; then + OPTIONS+='-fuse-ld=gold -Wl,--disable-new-dtags' + elif [ "${USE_LD}" == "LLD" ]; then + OPTIONS+='-fuse-ld=lld -Wl,--disable-new-dtags' + elif [ "${USE_LD}" != "DEFAULT" ] && [ "${USE_LD}" != "LD" ]; then + echo 'USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!' + exit 1 + fi + fi +fi case $1 in Application | DynamicLibrary) echo "Linking $3" if [ "$1" = "DynamicLibrary" ]; then - OPTIONS="-shared" - if [ "$PLATFORM" = "linux" ]; then - OPTIONS="$OPTIONS -Wl,-z,noexecstack -Wl,-soname,$( basename $3 )" - fi - if [ "$PLATFORM" = "osx" ]; then - OPTIONS="$OPTIONS -Wl,-dylib_install_name -Wl,$( basename $3 )" - fi - else - OPTIONS= + OPTIONS+=" -shared" + if [ "$PLATFORM" = "linux" ]; then + OPTIONS+=" -Wl,-z,noexecstack -Wl,-soname,$( basename $3 )" + elif [ "$PLATFORM" = "osx" ]; then + OPTIONS+=" -Wl,-dylib_install_name -Wl,$( basename $3 )" + fi fi OPTIONS=$( eval echo "$OPTIONS $LDFLAGS \$$2_OBJECTS \$$2_LIBS \$$2_STATIC_LIBS_REAL \$$2_LDFLAGS -o $3" ) - if echo $OPTIONS | grep -q -- -static-libstdc\+\+ ; then - OPTIONS=$( echo $OPTIONS | sed -e s,-static-libstdc\+\+,, -e s,\$,\ `$CC -print-file-name=libstdc++.a`\ -lm, ) + if [[ "${OPTIONS}" == *"-static-libstdc++"* ]]; then + staticlibs=() + staticpaths='' + if [[ "${CC}" == *"gcc"* ]]; then + staticlibs+=('libstdc++.a') + elif [[ "${CXX}" == *"clang++"* ]]; then + staticlibs+=('libc++.a' 'libc++abi.a') + fi + for staticlib in "${staticlibs[@]}"; do + staticpaths+="$("${CC}" -print-file-name="${staticlib}") " + done + OPTIONS=$( echo $OPTIONS | sed -e s,-static-libstdc\+\+,, -e s,\$,\ "${staticpaths}"\ -lm, ) fi case $PLATFORM in osx) - if echo $OPTIONS | grep -q -- -static-libgcc ; then + if [[ "${OPTIONS}" == *"-static-libgcc"* ]]; then $( $CC -### $OPTIONS 2>&1 | grep '^ ' | sed -e s,^\ ,, -e s,-lgcc[^\ ]*,,g -e s,\",,g -e s,\$,\ `$CC -print-file-name=libgcc_eh.a`, -e s,10.8.2,10.6, ) else $CC $OPTIONS diff --git a/cmake/ConfigureCompiler.cmake b/cmake/ConfigureCompiler.cmake index 3c6c06ea93..41a1162ee3 100644 --- a/cmake/ConfigureCompiler.cmake +++ b/cmake/ConfigureCompiler.cmake @@ -4,7 +4,7 @@ set(ALLOC_INSTRUMENTATION OFF CACHE BOOL "Instrument alloc") set(WITH_UNDODB OFF CACHE BOOL "Use rr or undodb") set(USE_ASAN OFF CACHE BOOL "Compile with address sanitizer") set(FDB_RELEASE OFF CACHE BOOL "This is a building of a final release") -set(USE_LD "LD" CACHE STRING "The linker to use for building: can be LD (system default, default choice), GOLD, or LLD") +set(USE_LD "DEFAULT" CACHE STRING "The linker to use for building: can be LD (system default, default choice), BFD, GOLD, or LLD") set(USE_LIBCXX OFF CACHE BOOL "Use libc++") set(USE_CCACHE OFF CACHE BOOL "Use ccache for compilation if available") set(RELATIVE_DEBUG_PATHS OFF CACHE BOOL "Use relative file paths in debug info") @@ -89,9 +89,23 @@ else() set(GCC YES) endif() + # Use the linker environmental variable, if specified and valid + if ((USE_LD STREQUAL "DEFAULT") AND (NOT "$ENV{USE_LD}" STREQUAL "")) + string(TOUPPER "$ENV{USE_LD}" USE_LDENV) + if (("${USE_LDENV}" STREQUAL "LD") OR ("${USE_LDENV}" STREQUAL "GOLD") OR ("${USE_LDENV}" STREQUAL "LLD") OR ("${USE_LDENV}" STREQUAL "BFD") OR ("${USE_LDENV}" STREQUAL "DEFAULT")) + set(USE_LD "${USE_LDENV}") + else() + message (FATAL_ERROR "USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!") + endif() + endif() + # check linker flags. - if ((NOT (USE_LD STREQUAL "LD")) AND (NOT (USE_LD STREQUAL "GOLD")) AND (NOT (USE_LD STREQUAL "LLD"))) - message (FATAL_ERROR "USE_LD must be set to LD, GOLD, or LLD!") + if (USE_LD STREQUAL "DEFAULT") + set(USE_LD "LD") + else() + if ((NOT (USE_LD STREQUAL "LD")) AND (NOT (USE_LD STREQUAL "GOLD")) AND (NOT (USE_LD STREQUAL "LLD")) AND (NOT (USE_LD STREQUAL "BFD"))) + message (FATAL_ERROR "USE_LD must be set to DEFAULT, LD, BFD, GOLD, or LLD!") + endif() endif() # if USE_LD=LD, then we don't do anything, defaulting to whatever system @@ -99,6 +113,11 @@ else() # implies the default xcode linker, and other distros may choose others by # default). + if(USE_LD STREQUAL "BFD") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd -Wl,--disable-new-dtags") + set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=bfd -Wl,--disable-new-dtags") + endif() + if(USE_LD STREQUAL "GOLD") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=gold -Wl,--disable-new-dtags") set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=gold -Wl,--disable-new-dtags") diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 449c799609..dfe2b0c82e 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -2,7 +2,7 @@ Release Notes ############# -6.2.3 +6.2.4 ===== Performance @@ -73,7 +73,8 @@ Bindings * Go: The Go bindings now require Go version 1.11 or later. * Go: Finalizers could run too early leading to undefined behavior. `(PR #1451) `_. * Added a transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) `_. -* Added a transaction option to control the whether ``get_addresses_for_key`` includes a port in the address. This will be deprecated in api version 700, and addresses will include ports by default. `(PR #2060) `_. +* Added a transaction option to control the whether ``get_addresses_for_key`` includes a port in the address. This will be deprecated in api version 700, and addresses will include ports by default. [6.2.4] `(PR #2060) `_. +* Python: ``Versionstamp`` comparisons didn't work in Python 3. [6.2.4] `(PR #2089) `_. Other Changes ------------- @@ -94,6 +95,7 @@ Other Changes * Added a ``no_wait`` option to the ``fdbcli`` exclude command to avoid blocking. `(PR #1852) `_. * Idle clusters will fsync much less frequently. `(PR #1697) `_. * CMake is now the official build system. The Makefile based build system is deprecated. +* The incompatible client list in status (``cluster.incompatible_connections``) may now spuriously include clients that use the multi-version API to try connecting to the cluster at multiple versions. Fixes only impacting 6.2.0+ --------------------------- diff --git a/fdbclient/CoordinationInterface.h b/fdbclient/CoordinationInterface.h index f090c3abdf..78667db6d9 100644 --- a/fdbclient/CoordinationInterface.h +++ b/fdbclient/CoordinationInterface.h @@ -155,7 +155,7 @@ struct OpenDatabaseCoordRequest { UID knownClientInfoID; Key clusterKey; vector coordinators; - ReplyPromise< struct ClientDBInfo > reply; + ReplyPromise< CachedSerialization > reply; template void serialize(Ar& ar) { diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index 36008bfff7..df3a6737ab 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -609,11 +609,11 @@ ACTOR Future getClientInfoFromLeader( ReferenceclientInfo->get().id; + req.knownClientInfoID = clientData->clientInfo->get().read().id; choose { when( ClientDBInfo ni = wait( brokenPromiseToNever( knownLeader->get().get().clientInterface.openDatabase.getReply( req ) ) ) ) { TraceEvent("MonitorLeaderForProxiesGotClientInfo", knownLeader->get().get().clientInterface.id()).detail("Proxy0", ni.proxies.size() ? ni.proxies[0].id() : UID()).detail("ClientID", ni.id); - clientData->clientInfo->set(ni); + clientData->clientInfo->set(CachedSerialization(ni)); } when( wait( knownLeader->onChange() ) ) {} } @@ -649,7 +649,7 @@ ACTOR Future monitorLeaderForProxies( Key clusterKey, vectorrandomUniqueID(); outInfo.forward = leader.get().first.serializedInfo; - clientData->clientInfo->set(outInfo); + clientData->clientInfo->set(CachedSerialization(outInfo)); TraceEvent("MonitorLeaderForProxiesForwarding").detail("NewConnStr", leader.get().first.serializedInfo.toString()); return Void(); } @@ -709,11 +709,11 @@ ACTOR Future monitorProxiesOneGeneration( Reference(); } - state ErrorOr rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) ); + state ErrorOr> rep = wait( clientLeaderServer.openDatabase.tryGetReply( req, TaskPriority::CoordinationReply ) ); if (rep.present()) { - if( rep.get().forward.present() ) { - TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString()); - info.intermediateConnFile = Reference(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().forward.get().toString()))); + if( rep.get().read().forward.present() ) { + TraceEvent("MonitorProxiesForwarding").detail("NewConnStr", rep.get().read().forward.get().toString()).detail("OldConnStr", info.intermediateConnFile->getConnectionString().toString()); + info.intermediateConnFile = Reference(new ClusterConnectionFile(connFile->getFilename(), ClusterConnectionString(rep.get().read().forward.get().toString()))); return info; } if(connFile != info.intermediateConnFile) { @@ -729,7 +729,7 @@ ACTOR Future monitorProxiesOneGeneration( ReferencenotifyConnected(); - auto& ni = rep.get(); + auto& ni = rep.get().mutate(); if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) { std::vector proxyUIDs; for(auto& proxy : ni.proxies) { @@ -747,7 +747,7 @@ ACTOR Future monitorProxiesOneGeneration( Referenceset( rep.get() ); + clientInfo->set( rep.get().read() ); successIdx = idx; } else if(idx == successIdx) { wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY)); diff --git a/fdbclient/MonitorLeader.h b/fdbclient/MonitorLeader.h index da8af4a0ee..89a128ec4f 100644 --- a/fdbclient/MonitorLeader.h +++ b/fdbclient/MonitorLeader.h @@ -42,11 +42,11 @@ struct ClientStatusInfo { struct ClientData { std::map clientStatusInfoMap; - Reference> clientInfo; + Reference>> clientInfo; OpenDatabaseRequest getRequest(); - ClientData() : clientInfo( new AsyncVar( ClientDBInfo() ) ) {} + ClientData() : clientInfo( new AsyncVar>( CachedSerialization() ) ) {} }; template diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index c39c3e2571..2f7af5c0c8 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -89,7 +89,7 @@ class ClusterControllerData { public: struct DBInfo { Reference> clientInfo; - Reference> serverInfo; + Reference>> serverInfo; ProcessIssuesMap workersWithIssues; std::map incompatibleConnections; AsyncTrigger forceMasterFailure; @@ -105,34 +105,37 @@ public: DBInfo() : masterRegistrationCount(0), recoveryStalled(false), forceRecovery(false), unfinishedRecoveries(0), logGenerations(0), clientInfo( new AsyncVar( ClientDBInfo() ) ), - serverInfo( new AsyncVar( ServerDBInfo() ) ), + serverInfo( new AsyncVar>( CachedSerialization() ) ), db( DatabaseContext::create( clientInfo, Future(), LocalityData(), true, TaskPriority::DefaultEndpoint, true ) ) // SOMEDAY: Locality! { } void setDistributor(const DataDistributorInterface& interf) { - ServerDBInfo newInfo = serverInfo->get(); + CachedSerialization newInfoCache = serverInfo->get(); + auto& newInfo = newInfoCache.mutate(); newInfo.id = deterministicRandom()->randomUniqueID(); newInfo.distributor = interf; - serverInfo->set( newInfo ); + serverInfo->set( newInfoCache ); } void setRatekeeper(const RatekeeperInterface& interf) { - ServerDBInfo newInfo = serverInfo->get(); + CachedSerialization newInfoCache = serverInfo->get(); + auto& newInfo = newInfoCache.mutate(); newInfo.id = deterministicRandom()->randomUniqueID(); newInfo.ratekeeper = interf; - serverInfo->set( newInfo ); + serverInfo->set( newInfoCache ); } void clearInterf(ProcessClass::ClassType t) { - ServerDBInfo newInfo = serverInfo->get(); + CachedSerialization newInfoCache = serverInfo->get(); + auto& newInfo = newInfoCache.mutate(); newInfo.id = deterministicRandom()->randomUniqueID(); if (t == ProcessClass::DataDistributorClass) { newInfo.distributor = Optional(); } else if (t == ProcessClass::RatekeeperClass) { newInfo.ratekeeper = Optional(); } - serverInfo->set( newInfo ); + serverInfo->set( newInfoCache ); } }; @@ -444,8 +447,8 @@ public: fitness = std::max(fitness, ProcessClass::ExcludeFit); } if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.details.interf.locality.dcId()==dcId ) { - if ((db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == it.first) || - (db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == it.first)) { + if ((db.serverInfo->get().read().distributor.present() && db.serverInfo->get().read().distributor.get().locality.processId() == it.first) || + (db.serverInfo->get().read().ratekeeper.present() && db.serverInfo->get().read().ratekeeper.get().locality.processId() == it.first)) { fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details); } else { fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].first.push_back(it.second.details); @@ -477,8 +480,8 @@ public: auto fitness = it.second.details.processClass.machineClassFitness( role ); if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && it.second.details.interf.locality.dcId() == dcId && ( !minWorker.present() || ( it.second.details.interf.id() != minWorker.get().worker.interf.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) { - if ((db.serverInfo->get().distributor.present() && db.serverInfo->get().distributor.get().locality.processId() == it.first) || - (db.serverInfo->get().ratekeeper.present() && db.serverInfo->get().ratekeeper.get().locality.processId() == it.first)) { + if ((db.serverInfo->get().read().distributor.present() && db.serverInfo->get().read().distributor.get().locality.processId() == it.first) || + (db.serverInfo->get().read().ratekeeper.present() && db.serverInfo->get().read().ratekeeper.get().locality.processId() == it.first)) { fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].second.push_back(it.second.details); } else { fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].first.push_back(it.second.details); @@ -877,7 +880,7 @@ public: } void checkRecoveryStalled() { - if( (db.serverInfo->get().recoveryState == RecoveryState::RECRUITING || db.serverInfo->get().recoveryState == RecoveryState::ACCEPTING_COMMITS || db.serverInfo->get().recoveryState == RecoveryState::ALL_LOGS_RECRUITED) && db.recoveryStalled ) { + if( (db.serverInfo->get().read().recoveryState == RecoveryState::RECRUITING || db.serverInfo->get().read().recoveryState == RecoveryState::ACCEPTING_COMMITS || db.serverInfo->get().read().recoveryState == RecoveryState::ALL_LOGS_RECRUITED) && db.recoveryStalled ) { if (db.config.regions.size() > 1) { auto regions = db.config.regions; if(clusterControllerDcId.get() == regions[0].dcId) { @@ -891,7 +894,7 @@ public: //FIXME: determine when to fail the cluster controller when a primaryDC has not been set bool betterMasterExists() { - ServerDBInfo dbi = db.serverInfo->get(); + const ServerDBInfo dbi = db.serverInfo->get().read(); if(dbi.recoveryState < RecoveryState::ACCEPTING_COMMITS) { return false; @@ -1086,7 +1089,7 @@ public: ASSERT(masterProcessId.present()); if (processId == masterProcessId) return false; - auto& dbInfo = db.serverInfo->get(); + auto& dbInfo = db.serverInfo->get().read(); for (const MasterProxyInterface& interf : dbInfo.client.proxies) { if (interf.locality.processId() == processId) return true; } @@ -1109,7 +1112,7 @@ public: std::map>, int> idUsed; updateKnownIds(&idUsed); - auto& dbInfo = db.serverInfo->get(); + auto& dbInfo = db.serverInfo->get().read(); for (const auto& tlogset : dbInfo.logSystemConfig.tLogs) { for (const auto& tlog: tlogset.tLogs) { if (tlog.present()) { @@ -1164,12 +1167,13 @@ public: gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0), versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false) { - auto serverInfo = db.serverInfo->get(); + CachedSerialization newInfoCache = db.serverInfo->get(); + auto& serverInfo = newInfoCache.mutate(); serverInfo.id = deterministicRandom()->randomUniqueID(); serverInfo.masterLifetime.ccID = id; serverInfo.clusterInterface = ccInterface; serverInfo.myLocality = locality; - db.serverInfo->set( serverInfo ); + db.serverInfo->set( newInfoCache ); cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, true, true); } @@ -1204,7 +1208,7 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster continue; } RecruitMasterRequest rmq; - rmq.lifetime = db->serverInfo->get().masterLifetime; + rmq.lifetime = db->serverInfo->get().read().masterLifetime; rmq.forceRecovery = db->forceRecovery; cluster->masterProcessId = masterWorker.worker.interf.locality.processId(); @@ -1224,17 +1228,19 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster db->masterRegistrationCount = 0; db->recoveryStalled = false; - auto dbInfo = ServerDBInfo(); + auto cachedInfo = CachedSerialization(); + auto& dbInfo = cachedInfo.mutate(); + dbInfo.master = iMaster; dbInfo.id = deterministicRandom()->randomUniqueID(); - dbInfo.masterLifetime = db->serverInfo->get().masterLifetime; + dbInfo.masterLifetime = db->serverInfo->get().read().masterLifetime; ++dbInfo.masterLifetime; - dbInfo.clusterInterface = db->serverInfo->get().clusterInterface; - dbInfo.distributor = db->serverInfo->get().distributor; - dbInfo.ratekeeper = db->serverInfo->get().ratekeeper; + dbInfo.clusterInterface = db->serverInfo->get().read().clusterInterface; + dbInfo.distributor = db->serverInfo->get().read().distributor; + dbInfo.ratekeeper = db->serverInfo->get().read().ratekeeper; TraceEvent("CCWDB", cluster->id).detail("Lifetime", dbInfo.masterLifetime.toString()).detail("ChangeID", dbInfo.id); - db->serverInfo->set( dbInfo ); + db->serverInfo->set( cachedInfo ); state Future spinDelay = delay(SERVER_KNOBS->MASTER_SPIN_DELAY); // Don't retry master recovery more than once per second, but don't delay the "first" recovery after more than a second of normal operation @@ -1271,16 +1277,16 @@ ACTOR Future clusterWatchDatabase( ClusterControllerData* cluster, Cluster ACTOR Future clusterGetServerInfo(ClusterControllerData::DBInfo* db, UID knownServerInfoID, Standalone> issues, std::vector incompatiblePeers, - ReplyPromise reply) { + ReplyPromise> reply) { state Optional issueID; setIssues(db->workersWithIssues, reply.getEndpoint().getPrimaryAddress(), issues, issueID); for(auto it : incompatiblePeers) { db->incompatibleConnections[it] = now() + SERVER_KNOBS->INCOMPATIBLE_PEERS_LOGGING_INTERVAL; } - while (db->serverInfo->get().id == knownServerInfoID) { + while (db->serverInfo->get().read().id == knownServerInfoID) { choose { - when (wait( db->serverInfo->onChange() )) {} + when (wait( yieldedFuture(db->serverInfo->onChange()) )) {} when (wait( delayJittered( 300 ) )) { break; } // The server might be long gone! } } @@ -1372,11 +1378,11 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) { } void checkBetterDDOrRK(ClusterControllerData* self) { - if (!self->masterProcessId.present() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + if (!self->masterProcessId.present() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) { return; } - const ServerDBInfo& db = self->db.serverInfo->get(); + auto& db = self->db.serverInfo->get().read(); auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::Ratekeeper); auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor); @@ -1422,7 +1428,7 @@ ACTOR Future doCheckOutstandingRequests( ClusterControllerData* self ) { self->checkRecoveryStalled(); if (self->betterMasterExists()) { self->db.forceMasterFailure.trigger(); - TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().master.id()); + TraceEvent("MasterRegistrationKill", self->id).detail("MasterId", self->db.serverInfo->get().read().master.id()); } } catch( Error &e ) { if(e.code() != error_code_operation_failed && e.code() != error_code_no_more_servers) { @@ -1719,8 +1725,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c //make sure the request comes from an active database auto db = &self->db; - if ( db->serverInfo->get().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) { - TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount); + if ( db->serverInfo->get().read().master.id() != req.id || req.registrationCount <= db->masterRegistrationCount ) { + TraceEvent("MasterRegistrationNotFound", self->id).detail("MasterId", req.id).detail("ExistingId", db->serverInfo->get().read().master.id()).detail("RegCount", req.registrationCount).detail("ExistingRegCount", db->masterRegistrationCount); return; } @@ -1753,7 +1759,8 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c } bool isChanged = false; - auto dbInfo = self->db.serverInfo->get(); + auto cachedInfo = self->db.serverInfo->get(); + auto& dbInfo = cachedInfo.mutate(); if (dbInfo.recoveryState != req.recoveryState) { dbInfo.recoveryState = req.recoveryState; @@ -1794,7 +1801,7 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c if( isChanged ) { dbInfo.id = deterministicRandom()->randomUniqueID(); - self->db.serverInfo->set( dbInfo ); + self->db.serverInfo->set( cachedInfo ); } checkOutstandingRequests(self); @@ -1855,7 +1862,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { if( info == self->id_worker.end() ) { self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded ); - if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) { + if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().read().master.locality.processId()) { self->masterProcessId = w.locality.processId(); } checkOutstandingRequests( self ); @@ -1879,7 +1886,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { TEST(true); // Received an old worker registration request. } - if (req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() && + if (req.distributorInterf.present() && !self->db.serverInfo->get().read().distributor.present() && self->clusterControllerDcId == req.distributorInterf.get().locality.dcId() && !self->recruitingDistributor) { const DataDistributorInterface& di = req.distributorInterf.get(); @@ -1896,7 +1903,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) { self->id_worker[w.locality.processId()].haltRatekeeper = brokenPromiseToNever(req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))); } else if(!self->recruitingRatekeeperID.present()) { const RatekeeperInterface& rki = req.ratekeeperInterf.get(); - const auto& ratekeeper = self->db.serverInfo->get().ratekeeper; + const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper; TraceEvent("CCRegisterRatekeeper", self->id).detail("RKID", rki.id()); if (ratekeeper.present() && ratekeeper.get().id() != rki.id() && self->id_worker.count(ratekeeper.get().locality.processId())) { TraceEvent("CCHaltPreviousRatekeeper", self->id).detail("RKID", ratekeeper.get().id()) @@ -2165,12 +2172,13 @@ ACTOR Future monitorServerInfoConfig(ClusterControllerData::DBInfo* db) { config = LatencyBandConfig::parse(configVal.get()); } - ServerDBInfo serverInfo = db->serverInfo->get(); + auto cachedInfo = db->serverInfo->get(); + auto& serverInfo = cachedInfo.mutate(); if(config != serverInfo.latencyBandConfig) { TraceEvent("LatencyBandConfigChanged").detail("Present", config.present()); serverInfo.id = deterministicRandom()->randomUniqueID(); serverInfo.latencyBandConfig = config; - db->serverInfo->set(serverInfo); + db->serverInfo->set(cachedInfo); } state Future configChangeFuture = tr.watch(latencyBandConfigKey); @@ -2324,7 +2332,7 @@ ACTOR Future updateDatacenterVersionDifference( ClusterControllerData *sel state double lastLogTime = 0; loop { self->versionDifferenceUpdated = false; - if(self->db.serverInfo->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) { + if(self->db.serverInfo->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS && self->db.config.usableRegions == 1) { bool oldDifferenceTooLarge = !self->versionDifferenceUpdated || self->datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE; self->versionDifferenceUpdated = true; self->datacenterVersionDifference = 0; @@ -2339,8 +2347,8 @@ ACTOR Future updateDatacenterVersionDifference( ClusterControllerData *sel state Optional primaryLog; state Optional remoteLog; - if(self->db.serverInfo->get().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) { - for(auto& logSet : self->db.serverInfo->get().logSystemConfig.tLogs) { + if(self->db.serverInfo->get().read().recoveryState >= RecoveryState::ALL_LOGS_RECRUITED) { + for(auto& logSet : self->db.serverInfo->get().read().logSystemConfig.tLogs) { if(logSet.isLocal && logSet.locality != tagLocalitySatellite) { for(auto& tLog : logSet.tLogs) { if(tLog.present()) { @@ -2441,12 +2449,12 @@ ACTOR Future startDataDistributor( ClusterControllerDa TraceEvent("CCStartDataDistributor", self->id); loop { try { - state bool no_distributor = !self->db.serverInfo->get().distributor.present(); - while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + state bool no_distributor = !self->db.serverInfo->get().read().distributor.present(); + while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().read().master.locality.processId() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) { wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); } - if (no_distributor && self->db.serverInfo->get().distributor.present()) { - return self->db.serverInfo->get().distributor.get(); + if (no_distributor && self->db.serverInfo->get().read().distributor.present()) { + return self->db.serverInfo->get().read().distributor.get(); } std::map>, int> id_used = self->getUsedIds(); @@ -2476,15 +2484,15 @@ ACTOR Future startDataDistributor( ClusterControllerDa } ACTOR Future monitorDataDistributor(ClusterControllerData *self) { - while(self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + while(self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) { wait(self->db.serverInfo->onChange()); } loop { - if ( self->db.serverInfo->get().distributor.present() ) { - wait( waitFailureClient( self->db.serverInfo->get().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) ); + if ( self->db.serverInfo->get().read().distributor.present() ) { + wait( waitFailureClient( self->db.serverInfo->get().read().distributor.get().waitFailure, SERVER_KNOBS->DD_FAILURE_TIME ) ); TraceEvent("CCDataDistributorDied", self->id) - .detail("DistributorId", self->db.serverInfo->get().distributor.get().id()); + .detail("DistributorId", self->db.serverInfo->get().read().distributor.get().id()); self->db.clearInterf(ProcessClass::DataDistributorClass); } else { self->recruitingDistributor = true; @@ -2501,11 +2509,11 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { TraceEvent("CCStartRatekeeper", self->id); loop { try { - state bool no_ratekeeper = !self->db.serverInfo->get().ratekeeper.present(); - while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + state bool no_ratekeeper = !self->db.serverInfo->get().read().ratekeeper.present(); + while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().read().master.locality.processId() || self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) { wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); } - if (no_ratekeeper && self->db.serverInfo->get().ratekeeper.present()) { + if (no_ratekeeper && self->db.serverInfo->get().read().ratekeeper.present()) { // Existing ratekeeper registers while waiting, so skip. return Void(); } @@ -2525,7 +2533,7 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { if (interf.present()) { self->recruitRatekeeper.set(false); self->recruitingRatekeeperID = interf.get().id(); - const auto& ratekeeper = self->db.serverInfo->get().ratekeeper; + const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper; TraceEvent("CCRatekeeperRecruited", self->id).detail("Addr", worker.interf.address()).detail("RKID", interf.get().id()); if (ratekeeper.present() && ratekeeper.get().id() != interf.get().id() && self->id_worker.count(ratekeeper.get().locality.processId())) { TraceEvent("CCHaltRatekeeperAfterRecruit", self->id).detail("RKID", ratekeeper.get().id()) @@ -2550,16 +2558,16 @@ ACTOR Future startRatekeeper(ClusterControllerData *self) { } ACTOR Future monitorRatekeeper(ClusterControllerData *self) { - while(self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) { + while(self->db.serverInfo->get().read().recoveryState < RecoveryState::ACCEPTING_COMMITS) { wait(self->db.serverInfo->onChange()); } loop { - if ( self->db.serverInfo->get().ratekeeper.present() && !self->recruitRatekeeper.get() ) { + if ( self->db.serverInfo->get().read().ratekeeper.present() && !self->recruitRatekeeper.get() ) { choose { - when(wait(waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) { + when(wait(waitFailureClient( self->db.serverInfo->get().read().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) { TraceEvent("CCRatekeeperDied", self->id) - .detail("RKID", self->db.serverInfo->get().ratekeeper.get().id()); + .detail("RKID", self->db.serverInfo->get().read().ratekeeper.get().id()); self->db.clearInterf(ProcessClass::RatekeeperClass); } when(wait(self->recruitRatekeeper.onChange())) {} diff --git a/fdbserver/Coordination.actor.cpp b/fdbserver/Coordination.actor.cpp index b88f9879bb..7ee6dd542b 100644 --- a/fdbserver/Coordination.actor.cpp +++ b/fdbserver/Coordination.actor.cpp @@ -209,7 +209,7 @@ TEST_CASE("/fdbserver/Coordination/localGenerationReg/simple") { } ACTOR Future openDatabase(ClientData* db, int* clientCount, Reference> hasConnectedClients, OpenDatabaseCoordRequest req) { - if(db->clientInfo->get().id != req.knownClientInfoID && !db->clientInfo->get().forward.present()) { + if(db->clientInfo->get().read().id != req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) { req.reply.send( db->clientInfo->get() ); return Void(); } @@ -218,9 +218,9 @@ ACTOR Future openDatabase(ClientData* db, int* clientCount, ReferenceclientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup, req.supportedVersions, req.issues); - while (db->clientInfo->get().id == req.knownClientInfoID && !db->clientInfo->get().forward.present()) { + while (db->clientInfo->get().read().id == req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) { choose { - when (wait( db->clientInfo->onChange() )) {} + when (wait( yieldedFuture(db->clientInfo->onChange()) )) {} when (wait( delayJittered( SERVER_KNOBS->CLIENT_REGISTER_INTERVAL ) )) { break; } // The client might be long gone! } } @@ -315,10 +315,12 @@ ACTOR Future leaderRegister(LeaderElectionRegInterface interf, Key key) { ClientDBInfo outInfo; outInfo.id = deterministicRandom()->randomUniqueID(); outInfo.forward = req.conn.toString(); - clientData.clientInfo->set(outInfo); + clientData.clientInfo->set(CachedSerialization(outInfo)); req.reply.send( Void() ); - ASSERT(!hasConnectedClients->get()); - return Void(); + if(!hasConnectedClients->get()) { + return Void(); + } + nextInterval = Future(); } when ( wait(nextInterval.isValid() ? nextInterval : Never()) ) { if (!availableLeaders.size() && !availableCandidates.size() && !notify.size() && @@ -478,7 +480,7 @@ ACTOR Future leaderServer(LeaderElectionRegInterface interf, OnDemandStore ClientDBInfo info; info.id = deterministicRandom()->randomUniqueID(); info.forward = forward.get().serializedInfo; - req.reply.send( info ); + req.reply.send( CachedSerialization(info) ); } else { regs.getInterface(req.clusterKey, id).openDatabase.send( req ); } diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index a978e081e4..c663645510 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -1810,7 +1810,7 @@ struct DDTeamCollection : ReferenceCounted { // The desired machine team number is not the same with the desired server team number // in notEnoughTeamsForAServer() below, because the machineTeamRemover() does not // remove a machine team with the most number of machine teams. - if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine) { + if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine && isMachineHealthy(m.second)) { return true; } } @@ -1831,7 +1831,7 @@ struct DDTeamCollection : ReferenceCounted { int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2; ASSERT(targetTeamNumPerServer > 0); for (auto& s : server_info) { - if (s.second->teams.size() < targetTeamNumPerServer) { + if (s.second->teams.size() < targetTeamNumPerServer && !server_status.get(s.first).isUnhealthy()) { return true; } } diff --git a/fdbserver/ServerDBInfo.h b/fdbserver/ServerDBInfo.h index 595507b1dd..67407e1fa9 100644 --- a/fdbserver/ServerDBInfo.h +++ b/fdbserver/ServerDBInfo.h @@ -67,7 +67,7 @@ struct GetServerDBInfoRequest { UID knownServerInfoID; Standalone> issues; std::vector incompatiblePeers; - ReplyPromise< struct ServerDBInfo > reply; + ReplyPromise< CachedSerialization > reply; template void serialize(Ar& ar) { diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 8c9e64beed..5788a7895e 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -566,7 +566,7 @@ struct RolesInfo { }; ACTOR static Future processStatusFetcher( - Reference> db, std::vector workers, WorkerEvents pMetrics, + Reference>> db, std::vector workers, WorkerEvents pMetrics, WorkerEvents mMetrics, WorkerEvents nMetrics, WorkerEvents errors, WorkerEvents traceFileOpenErrors, WorkerEvents programStarts, std::map> processIssues, vector> storageServers, @@ -624,18 +624,18 @@ ACTOR static Future processStatusFetcher( state RolesInfo roles; - roles.addRole("master", db->get().master); - roles.addRole("cluster_controller", db->get().clusterInterface.clientInterface); + roles.addRole("master", db->get().read().master); + roles.addRole("cluster_controller", db->get().read().clusterInterface.clientInterface); - if (db->get().distributor.present()) { - roles.addRole("data_distributor", db->get().distributor.get()); + if (db->get().read().distributor.present()) { + roles.addRole("data_distributor", db->get().read().distributor.get()); } - if (db->get().ratekeeper.present()) { - roles.addRole("ratekeeper", db->get().ratekeeper.get()); + if (db->get().read().ratekeeper.present()) { + roles.addRole("ratekeeper", db->get().read().ratekeeper.get()); } - for(auto& tLogSet : db->get().logSystemConfig.tLogs) { + for(auto& tLogSet : db->get().read().logSystemConfig.tLogs) { for(auto& it : tLogSet.logRouters) { if(it.present()) { roles.addRole("router", it.interf()); @@ -643,7 +643,7 @@ ACTOR static Future processStatusFetcher( } } - for(auto& old : db->get().logSystemConfig.oldTLogs) { + for(auto& old : db->get().read().logSystemConfig.oldTLogs) { for(auto& tLogSet : old.tLogs) { for(auto& it : tLogSet.logRouters) { if(it.present()) { @@ -686,7 +686,7 @@ ACTOR static Future processStatusFetcher( } state std::vector::const_iterator res; - state std::vector resolvers = db->get().resolvers; + state std::vector resolvers = db->get().read().resolvers; for(res = resolvers.begin(); res != resolvers.end(); ++res) { roles.addRole( "resolver", *res ); wait(yield()); @@ -1464,8 +1464,8 @@ ACTOR static Future>> getStor return results; } -ACTOR static Future>> getTLogsAndMetrics(Reference> db, std::unordered_map address_workers) { - vector servers = db->get().logSystemConfig.allPresentLogs(); +ACTOR static Future>> getTLogsAndMetrics(Reference>> db, std::unordered_map address_workers) { + vector servers = db->get().read().logSystemConfig.allPresentLogs(); vector> results = wait(getServerMetrics(servers, address_workers, std::vector{ "TLogMetrics" })); @@ -1550,7 +1550,7 @@ JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transP return perfLimit; } -ACTOR static Future workloadStatusFetcher(Reference> db, vector workers, WorkerDetails mWorker, WorkerDetails rkWorker, +ACTOR static Future workloadStatusFetcher(Reference>> db, vector workers, WorkerDetails mWorker, WorkerDetails rkWorker, JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set *incomplete_reasons, Future>>> storageServerFuture) { state JsonBuilderObject statusObj; @@ -1565,7 +1565,7 @@ ACTOR static Future workloadStatusFetcher(Referenceget().client.proxies) { + for (auto &p : db->get().read().client.proxies) { auto worker = getWorker(workersMap, p.address()); if (worker.present()) proxyStatFutures.push_back(timeoutError(worker.get().interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("ProxyMetrics"))), 1.0)); @@ -1772,11 +1772,11 @@ ACTOR static Future clusterSummaryStatisticsFetcher(WorkerEve return statusObj; } -static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference> db, std::unordered_map const& address_workers) { +static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference>> db, std::unordered_map const& address_workers) { JsonBuilderArray oldTlogsArray; - if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS) { - for(auto it : db->get().logSystemConfig.oldTLogs) { + if(db->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS) { + for(auto it : db->get().read().logSystemConfig.oldTLogs) { JsonBuilderObject statusObj; JsonBuilderArray logsObj; Optional sat_log_replication_factor, sat_log_write_anti_quorum, sat_log_fault_tolerance, log_replication_factor, log_write_anti_quorum, log_fault_tolerance, remote_log_replication_factor, remote_log_fault_tolerance; @@ -2022,7 +2022,7 @@ ACTOR Future layerStatusFetcher(Database cx, JsonBuilderArray return statusObj; } -ACTOR Future lockedStatusFetcher(Reference> db, JsonBuilderArray *messages, std::set *incomplete_reasons) { +ACTOR Future lockedStatusFetcher(Reference>> db, JsonBuilderArray *messages, std::set *incomplete_reasons) { state JsonBuilderObject statusObj; state Database cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, false); // Open a new database connection that isn't lock-aware @@ -2063,7 +2063,7 @@ ACTOR Future lockedStatusFetcher(Reference clusterGetStatus( - Reference> db, + Reference>> db, Database cx, vector workers, ProcessIssuesMap workerIssues, @@ -2083,7 +2083,7 @@ ACTOR Future clusterGetStatus( try { // Get the master Worker interface - Optional _mWorker = getWorker( workers, db->get().master.address() ); + Optional _mWorker = getWorker( workers, db->get().read().master.address() ); if (_mWorker.present()) { mWorker = _mWorker.get(); } else { @@ -2091,11 +2091,11 @@ ACTOR Future clusterGetStatus( } // Get the DataDistributor worker interface Optional _ddWorker; - if (db->get().distributor.present()) { - _ddWorker = getWorker( workers, db->get().distributor.get().address() ); + if (db->get().read().distributor.present()) { + _ddWorker = getWorker( workers, db->get().read().distributor.get().address() ); } - if (!db->get().distributor.present() || !_ddWorker.present()) { + if (!db->get().read().distributor.present() || !_ddWorker.present()) { messages.push_back(JsonString::makeMessage("unreachable_dataDistributor_worker", "Unable to locate the data distributor worker.")); } else { ddWorker = _ddWorker.get(); @@ -2103,11 +2103,11 @@ ACTOR Future clusterGetStatus( // Get the Ratekeeper worker interface Optional _rkWorker; - if (db->get().ratekeeper.present()) { - _rkWorker = getWorker( workers, db->get().ratekeeper.get().address() ); + if (db->get().read().ratekeeper.present()) { + _rkWorker = getWorker( workers, db->get().read().ratekeeper.get().address() ); } - if (!db->get().ratekeeper.present() || !_rkWorker.present()) { + if (!db->get().read().ratekeeper.present() || !_rkWorker.present()) { messages.push_back(JsonString::makeMessage("unreachable_ratekeeper_worker", "Unable to locate the ratekeeper worker.")); } else { rkWorker = _rkWorker.get(); @@ -2165,8 +2165,8 @@ ACTOR Future clusterGetStatus( state WorkerEvents programStarts = workerEventsVec[5].present() ? workerEventsVec[5].get().first : WorkerEvents(); state JsonBuilderObject statusObj; - if(db->get().recoveryCount > 0) { - statusObj["generation"] = db->get().recoveryCount; + if(db->get().read().recoveryCount > 0) { + statusObj["generation"] = db->get().read().recoveryCount; } state std::map> processIssues = @@ -2242,7 +2242,7 @@ ACTOR Future clusterGetStatus( state std::vector workerStatuses = wait(getAll(futures2)); int oldLogFaultTolerance = 100; - if(db->get().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().logSystemConfig.oldTLogs.size() > 0) { + if(db->get().read().recoveryState >= RecoveryState::ACCEPTING_COMMITS && db->get().read().logSystemConfig.oldTLogs.size() > 0) { statusObj["old_logs"] = oldTlogFetcher(&oldLogFaultTolerance, db, address_workers); } diff --git a/fdbserver/Status.h b/fdbserver/Status.h index 7fb97f442c..7a6537e94e 100644 --- a/fdbserver/Status.h +++ b/fdbserver/Status.h @@ -34,7 +34,7 @@ void setIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, VectorRef void removeIssues(ProcessIssuesMap& issueMap, NetworkAddress const& addr, Optional& issueID); -Future clusterGetStatus( Reference> const& db, Database const& cx, vector const& workers, +Future clusterGetStatus( Reference>> const& db, Database const& cx, vector const& workers, ProcessIssuesMap const& workerIssues, std::map>* const& clientStatus, ServerCoordinators const& coordinators, std::vector const& incompatibleConnections, Version const& datacenterVersionDifference ); #endif diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index bf54f4c3fd..7210feb79a 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -51,9 +51,9 @@ struct TLogInterface { TLogInterface() {} - explicit TLogInterface(LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; } - TLogInterface(UID sharedTLogID, LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {} - TLogInterface(UID uniqueID, UID sharedTLogID, LocalityData locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {} + explicit TLogInterface(const LocalityData& locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; } + TLogInterface(UID sharedTLogID, const LocalityData& locality) : uniqueID( deterministicRandom()->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {} + TLogInterface(UID uniqueID, UID sharedTLogID, const LocalityData& locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {} UID id() const { return uniqueID; } UID getSharedTLogID() const { return sharedTLogID; } std::string toString() const { return id().shortString(); } diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index b46582259f..113d80eb17 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -434,6 +434,8 @@ struct LogData : NonCopyable, public ReferenceCounted { Version queueCommittingVersion; Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion; Version queuePoppedVersion; + Version minPoppedTagVersion; + Tag minPoppedTag; Deque>>> messageBlocks; std::vector>> tag_data; //tag.locality | tag.id @@ -498,6 +500,7 @@ struct LogData : NonCopyable, public ReferenceCounted { explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()), cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion), logSystem(new AsyncVar>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()), + minPoppedTagVersion(0), minPoppedTag(invalidTag), // These are initialized differently on init() or recovery recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0), logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false) @@ -515,6 +518,8 @@ struct LogData : NonCopyable, public ReferenceCounted { specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; }); specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; }); specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; }); + specialCounter(cc, "MinPoppedTagVersion", [this](){ return this->minPoppedTagVersion; }); + specialCounter(cc, "MinPoppedTag", [this](){ return this->minPoppedTag.toString(); }); specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; }); specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; }); specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; }); @@ -733,12 +738,20 @@ ACTOR Future popDiskQueue( TLogData* self, Reference logData ) { minLocation = locationIter->value.first; minVersion = locationIter->key; } + logData->minPoppedTagVersion = std::numeric_limits::max(); + for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) { for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) { Reference tagData = logData->tag_data[tagLocality][tagId]; - if (tagData && tagData->tag.locality != tagLocalityTxs && tagData->tag != txsTag && !tagData->nothingPersistent) { - minLocation = std::min(minLocation, tagData->poppedLocation); - minVersion = std::min(minVersion, tagData->popped); + if (tagData && tagData->tag.locality != tagLocalityTxs && tagData->tag != txsTag) { + if(!tagData->nothingPersistent) { + minLocation = std::min(minLocation, tagData->poppedLocation); + minVersion = std::min(minVersion, tagData->popped); + } + if((!tagData->nothingPersistent || tagData->versionMessages.size()) && tagData->popped < logData->minPoppedTagVersion) { + logData->minPoppedTagVersion = tagData->popped; + logData->minPoppedTag = tagData->tag; + } } } } diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index 448b9138f5..c0d447d35f 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -413,6 +413,7 @@ void endRole(const Role &role, UID id, std::string reason, bool ok = true, Error struct ServerDBInfo; class Database openDBOnServer( Reference> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, bool enableLocalityLoadBalance = true, bool lockAware = false ); +class Database openDBOnServer( Reference>> const& db, TaskPriority taskID = TaskPriority::DefaultEndpoint, bool enableLocalityLoadBalance = true, bool lockAware = false ); ACTOR Future extractClusterInterface(Reference>> a, Reference>> b); diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 5ca2830f58..aac7785498 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -545,6 +545,10 @@ ACTOR Future>> recruitEverything( Refere if (!self->configuration.isValid()) { RecoveryStatus::RecoveryStatus status; if (self->configuration.initialized) { + TraceEvent(SevWarn, "MasterRecoveryInvalidConfiguration", self->dbgid) + .setMaxEventLength(11000) + .setMaxFieldLength(10000) + .detail("Conf", self->configuration.toString()); status = RecoveryStatus::configuration_invalid; } else if (!self->cstate.prevDBState.tLogs.size()) { status = RecoveryStatus::configuration_never_created; @@ -665,7 +669,12 @@ ACTOR Future readTransactionSystemState( Reference self, Refer self->configuration.fromKeyValues( rawConf ); self->originalConfiguration = self->configuration; self->hasConfiguration = true; - TraceEvent("MasterRecoveredConfig", self->dbgid).detail("Conf", self->configuration.toString()).trackLatest("RecoveredConfig"); + + TraceEvent("MasterRecoveredConfig", self->dbgid) + .setMaxEventLength(11000) + .setMaxFieldLength(10000) + .detail("Conf", self->configuration.toString()) + .trackLatest("RecoveredConfig"); Standalone> rawLocalities = wait( self->txnStateStore->readRange( tagLocalityListKeys ) ); self->dcId_locality.clear(); @@ -797,7 +806,10 @@ void updateConfigForForcedRecovery(Reference self, vectorconfiguration.getRegionJSON(); regionCommit.mutations.push_back_deep(regionCommit.arena(), MutationRef(MutationRef::SetValue, configKeysPrefix.toString() + "regions", BinaryWriter::toValue(regionJSON, IncludeVersion()).toString())); self->configuration.applyMutation( regionCommit.mutations.back() ); //modifying the configuration directly does not change the configuration when it is re-serialized unless we call applyMutation - TraceEvent("ForcedRecoveryConfigChange", self->dbgid).detail("Conf", self->configuration.toString()); + TraceEvent("ForcedRecoveryConfigChange", self->dbgid) + .setMaxEventLength(11000) + .setMaxFieldLength(10000) + .detail("Conf", self->configuration.toString()); } initialConfChanges->push_back(regionCommit); } diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 0156197e30..c9908f62ab 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -465,9 +465,10 @@ public: struct Counters { CounterCollection cc; - Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries; + Counter allQueries, getKeyQueries, getValueQueries, getRangeQueries, finishedQueries, rowsQueried, bytesQueried, watchQueries, emptyQueries; Counter bytesInput, bytesDurable, bytesFetched, mutationBytes; // Like bytesInput but without MVCC accounting + Counter sampledBytesCleared; Counter mutations, setMutations, clearRangeMutations, atomicMutations; Counter updateBatches, updateVersions; Counter loops; @@ -486,10 +487,12 @@ public: rowsQueried("RowsQueried", cc), bytesQueried("BytesQueried", cc), watchQueries("WatchQueries", cc), + emptyQueries("EmptyQueries", cc), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), bytesFetched("BytesFetched", cc), mutationBytes("MutationBytes", cc), + sampledBytesCleared("SampledBytesCleared", cc), mutations("Mutations", cc), setMutations("SetMutations", cc), clearRangeMutations("ClearRangeMutations", cc), @@ -881,6 +884,9 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { resultSize = v.get().size(); data->counters.bytesQueried += resultSize; } + else { + ++data->counters.emptyQueries; + } if( req.debugID.present() ) g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask()); @@ -1439,6 +1445,9 @@ ACTOR Future getKeyValues( StorageServer* data, GetKeyValuesRequest req ) resultSize = req.limitBytes - remainingLimitBytes; data->counters.bytesQueried += resultSize; data->counters.rowsQueried += r.data.size(); + if(r.data.size() == 0) { + ++data->counters.emptyQueries; + } } } catch (Error& e) { if(!canReplyWith(e)) @@ -3269,6 +3278,8 @@ void StorageServer::byteSampleApplyClear( KeyRangeRef range, Version ver ) { if(range.begin < allKeys.end) { //NotifyBytes should not be called for keys past allKeys.end KeyRangeRef searchRange = KeyRangeRef(range.begin, std::min(range.end, allKeys.end)); + counters.sampledBytesCleared += byteSample.sumRange(searchRange.begin, searchRange.end); + auto r = metrics.waitMetricsMap.intersectingRanges(searchRange); for(auto shard = r.begin(); shard != r.end(); ++shard) { KeyRangeRef intersectingRange = shard.range() & range; diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index e2ab425f52..22f2b221ef 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -75,11 +75,23 @@ ACTOR static Future extractClientInfo( Reference> d } } +ACTOR static Future extractClientInfo( Reference>> db, Reference> info ) { + loop { + info->set( db->get().read().client ); + wait( db->onChange() ); + } +} + Database openDBOnServer( Reference> const& db, TaskPriority taskID, bool enableLocalityLoadBalance, bool lockAware ) { Reference> info( new AsyncVar ); return DatabaseContext::create( info, extractClientInfo(db, info), enableLocalityLoadBalance ? db->get().myLocality : LocalityData(), enableLocalityLoadBalance, taskID, lockAware ); } +Database openDBOnServer( Reference>> const& db, TaskPriority taskID, bool enableLocalityLoadBalance, bool lockAware ) { + Reference> info( new AsyncVar ); + return DatabaseContext::create( info, extractClientInfo(db, info), enableLocalityLoadBalance ? db->get().read().myLocality : LocalityData(), enableLocalityLoadBalance, taskID, lockAware ); +} + struct ErrorInfo { Error error; const Role &role; @@ -725,11 +737,12 @@ ACTOR Future monitorServerDBInfo( Referenceget().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) { - TraceEvent("GotServerDBInfoChange").detail("ChangeID", ni.id).detail("MasterID", ni.master.id()) - .detail("RatekeeperID", ni.ratekeeper.present() ? ni.ratekeeper.get().id() : UID()) - .detail("DataDistributorID", ni.distributor.present() ? ni.distributor.get().id() : UID()); - ServerDBInfo localInfo = ni; + when( CachedSerialization ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) { + ServerDBInfo localInfo = ni.read(); + TraceEvent("GotServerDBInfoChange").detail("ChangeID", localInfo.id).detail("MasterID", localInfo.master.id()) + .detail("RatekeeperID", localInfo.ratekeeper.present() ? localInfo.ratekeeper.get().id() : UID()) + .detail("DataDistributorID", localInfo.distributor.present() ? localInfo.distributor.get().id() : UID()); + localInfo.myLocality = locality; dbInfo->set(localInfo); } diff --git a/flow/ObjectSerializerTraits.h b/flow/ObjectSerializerTraits.h index 43fa6cfc03..2f560f441c 100644 --- a/flow/ObjectSerializerTraits.h +++ b/flow/ObjectSerializerTraits.h @@ -96,6 +96,12 @@ struct serializable_traits : std::false_type { static void serialize(Archiver& ar, T& v); }; +template +struct serialize_raw : std::false_type { + template + static uint8_t* save_raw(Context& context, const T& obj); +}; + template struct vector_like_traits : std::false_type { // Write this at the beginning of the buffer diff --git a/flow/flat_buffers.cpp b/flow/flat_buffers.cpp index adb5da3312..9d1211b67f 100644 --- a/flow/flat_buffers.cpp +++ b/flow/flat_buffers.cpp @@ -421,7 +421,8 @@ TEST_CASE("/flow/FlatBuffers/file_identifier") { TestContext context{arena}; const uint8_t* out; constexpr FileIdentifier file_identifier{ 1234 }; - out = save_members(context, file_identifier); + Y1 y1; + out = save_members(context, file_identifier, y1); // print_buffer(out, arena.get_size(out)); ASSERT(read_file_identifier(out) == file_identifier); return Void(); diff --git a/flow/flat_buffers.h b/flow/flat_buffers.h index 54b3368916..4794773a85 100644 --- a/flow/flat_buffers.h +++ b/flow/flat_buffers.h @@ -1110,7 +1110,7 @@ uint8_t* save(Context& context, const Root& root, FileIdentifier file_identifier save_with_vtables(root, vtableset, precompute_size, &vtable_start, file_identifier, context); uint8_t* out = context.allocate(precompute_size.current_buffer_size); WriteToBuffer writeToBuffer{ context, precompute_size.current_buffer_size, vtable_start, out, - precompute_size.writeToOffsets.begin() }; + precompute_size.writeToOffsets.begin() }; save_with_vtables(root, vtableset, writeToBuffer, &vtable_start, file_identifier, context); return out; } @@ -1122,10 +1122,15 @@ void load(Root& root, const uint8_t* in, Context& context) { } // namespace detail -template -uint8_t* save_members(Context& context, FileIdentifier file_identifier, Members&... members) { - const auto& root = detail::fake_root(members...); - return detail::save(context, root, file_identifier); +template +uint8_t* save_members(Context& context, FileIdentifier file_identifier, const FirstMember& first, + const Members&... members) { + if constexpr (serialize_raw::value) { + return serialize_raw::save_raw(context, first); + } else { + const auto& root = detail::fake_root(const_cast(first), const_cast(members)...); + return detail::save(context, root, file_identifier); + } } template @@ -1158,6 +1163,7 @@ struct EnsureTable template void serialize(Archive& ar) { if constexpr (is_fb_function) { + // This is only for vtable collection. Load and save use the LoadSaveHelper specialization below if constexpr (detail::expect_serialize_member) { if constexpr (serializable_traits::value) { serializable_traits::serialize(ar, t); @@ -1172,7 +1178,41 @@ struct EnsureTable } } T& asUnderlyingType() { return t; } + const T& asUnderlyingType() const { return t; } private: T t; }; + +namespace detail { + +// Ensure if there's a LoadSaveHelper specialization available for T it gets used. +template +struct LoadSaveHelper, Context> : Context { + LoadSaveHelper(const Context& context) : Context(context), alreadyATable(context), wrapInTable(context) {} + + void load(EnsureTable& member, const uint8_t* current) { + if constexpr (expect_serialize_member) { + alreadyATable.load(member.asUnderlyingType(), current); + } else { + FakeRoot t{ member.asUnderlyingType() }; + wrapInTable.load(t, current); + } + } + + template + RelativeOffset save(const EnsureTable& member, Writer& writer, const VTableSet* vtables) { + if constexpr (expect_serialize_member) { + return alreadyATable.save(member.asUnderlyingType(), writer, vtables); + } else { + FakeRoot t{ const_cast(member.asUnderlyingType()) }; + return wrapInTable.save(t, writer, vtables); + } + } + +private: + LoadSaveHelper alreadyATable; + LoadSaveHelper, Context> wrapInTable; +}; + +} // namespace detail diff --git a/flow/flow.h b/flow/flow.h index 581255e96f..1d32e8a520 100644 --- a/flow/flow.h +++ b/flow/flow.h @@ -235,6 +235,106 @@ struct union_like_traits> : std::true_type { } }; +template +class CachedSerialization { +public: + constexpr static FileIdentifier file_identifier = FileIdentifierFor::value; + + //FIXME: this code will not work for caching a direct serialization from ObjectWriter, because it adds an ErrorOr, + // we should create a separate SerializeType for direct serialization + enum class SerializeType { None, Binary, Object }; + + CachedSerialization() : cacheType(SerializeType::None) {} + explicit CachedSerialization(const T& data) : data(data), cacheType(SerializeType::None) {} + + const T& read() const { return data; } + + T& mutate() { + cacheType = SerializeType::None; + return data; + } + + //This should only be called from the ObjectSerializer load function + Standalone getCache() const { + if(cacheType != SerializeType::Object) { + cache = ObjectWriter::toValue(ErrorOr>(data), AssumeVersion(currentProtocolVersion)); + cacheType = SerializeType::Object; + } + return cache; + } + + bool operator == (CachedSerialization const& rhs) const { + return data == rhs.data; + } + bool operator != (CachedSerialization const& rhs) const { + return !(*this == rhs); + } + bool operator < (CachedSerialization const& rhs) const { + return data < rhs.data; + } + + template + void serialize(Ar& ar) { + if constexpr (is_fb_function) { + // Suppress vtable collection. Save and load are implemented via the specializations below + } else { + if (Ar::isDeserializing) { + cache = Standalone(); + cacheType = SerializeType::None; + serializer(ar, data); + } else { + if (cacheType != SerializeType::Binary) { + cache = BinaryWriter::toValue(data, AssumeVersion(currentProtocolVersion)); + cacheType = SerializeType::Binary; + } + ar.serializeBytes(const_cast(cache.begin()), cache.size()); + } + } + } + +private: + T data; + mutable SerializeType cacheType; + mutable Standalone cache; +}; + +// this special case is needed - the code expects +// Standalone and T to be equivalent for serialization +namespace detail { + +template +struct LoadSaveHelper, Context> : Context { + LoadSaveHelper(const Context& context) + : Context(context), helper(context) {} + + void load(CachedSerialization& member, const uint8_t* current) { + helper.load(member.mutate(), current); + } + + template + RelativeOffset save(const CachedSerialization& member, Writer& writer, const VTableSet* vtables) { + throw internal_error(); + } + +private: + LoadSaveHelper helper; +}; + +} // namespace detail + +template +struct serialize_raw>>> : std::true_type { + template + static uint8_t* save_raw(Context& context, const ErrorOr>>& obj) { + auto cache = obj.present() ? obj.get().asUnderlyingType().getCache() + : ObjectWriter::toValue(ErrorOr>(obj.getError()), + AssumeVersion(currentProtocolVersion)); + uint8_t* out = context.allocate(cache.size()); + memcpy(out, cache.begin(), cache.size()); + return out; + } +}; + template struct Callback { Callback *prev, *next;