Merge branch 'master' of github.com:apple/foundationdb into refactor-fdbcli-2

This commit is contained in:
Chaoguang Lin 2021-06-09 02:29:18 +00:00
commit 1f7acc8d02
87 changed files with 1123 additions and 376 deletions

1
.gitignore vendored
View File

@ -8,6 +8,7 @@ bindings/java/foundationdb-tests*.jar
bindings/java/fdb-java-*-sources.jar
packaging/msi/FDBInstaller.msi
builds/
cmake-build-debug/
# Generated source, build, and packaging files
*.g.cpp
*.g.h

View File

@ -78,6 +78,8 @@ if(NOT WIN32)
test/unit/fdb_api.cpp
test/unit/fdb_api.hpp)
set(UNIT_TEST_VERSION_510_SRCS test/unit/unit_tests_version_510.cpp)
if(OPEN_FOR_IDE)
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
@ -85,6 +87,7 @@ if(NOT WIN32)
add_library(mako OBJECT ${MAKO_SRCS})
add_library(fdb_c_setup_tests OBJECT test/unit/setup_tests.cpp)
add_library(fdb_c_unit_tests OBJECT ${UNIT_TEST_SRCS})
add_library(fdb_c_unit_tests_version_510 OBJECT ${UNIT_TEST_VERSION_510_SRCS})
else()
add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
@ -92,6 +95,7 @@ if(NOT WIN32)
add_executable(mako ${MAKO_SRCS})
add_executable(fdb_c_setup_tests test/unit/setup_tests.cpp)
add_executable(fdb_c_unit_tests ${UNIT_TEST_SRCS})
add_executable(fdb_c_unit_tests_version_510 ${UNIT_TEST_VERSION_510_SRCS})
strip_debug_symbols(fdb_c_performance_test)
strip_debug_symbols(fdb_c_ryw_benchmark)
strip_debug_symbols(fdb_c_txn_size_test)
@ -104,8 +108,10 @@ if(NOT WIN32)
add_dependencies(fdb_c_unit_tests doctest)
target_include_directories(fdb_c_setup_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_include_directories(fdb_c_unit_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_include_directories(fdb_c_unit_tests_version_510 PUBLIC ${DOCTEST_INCLUDE_DIR})
target_link_libraries(fdb_c_setup_tests PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_unit_tests PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_unit_tests_version_510 PRIVATE fdb_c Threads::Threads)
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
@ -135,6 +141,11 @@ if(NOT WIN32)
COMMAND $<TARGET_FILE:fdb_c_unit_tests>
@CLUSTER_FILE@
fdb)
add_fdbclient_test(
NAME fdb_c_unit_tests_version_510
COMMAND $<TARGET_FILE:fdb_c_unit_tests_version_510>
@CLUSTER_FILE@
fdb)
add_fdbclient_test(
NAME fdb_c_external_client_unit_tests
COMMAND $<TARGET_FILE:fdb_c_unit_tests>
@ -158,6 +169,10 @@ set_target_properties(c_workloads PROPERTIES
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/share/foundationdb")
target_link_libraries(c_workloads PUBLIC fdb_c)
if (NOT WIN32 AND NOT APPLE AND NOT OPEN_FOR_IDE)
target_link_options(c_workloads PRIVATE "LINKER:--version-script=${CMAKE_CURRENT_SOURCE_DIR}/external_workload.map,-z,nodelete")
endif()
# TODO: re-enable once the old vcxproj-based build system is removed.
#generate_export_header(fdb_c EXPORT_MACRO_NAME "DLLEXPORT"
# EXPORT_FILE_NAME ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/fdb_c_export.h)

View File

@ -0,0 +1,7 @@
{
global:
workloadFactory;
local:
*;
};

View File

@ -74,10 +74,41 @@ def write_unix_asm(asmfile, functions, prefix):
for f in functions:
asmfile.write("\n.globl %s%s\n" % (prefix, f))
asmfile.write("%s%s:\n" % (prefix, f))
# These assembly implementations of versioned fdb c api functions must have the following properties.
#
# 1. Don't require dynamic relocation.
#
# 2. Perform a tail-call to the function pointer that works for a
# function with any number of arguments. For example, since registers x0-x7 are used
# to pass arguments in the Arm calling convention we must not use x0-x7
# here.
#
# You can compile this example c program to get a rough idea of how to
# load the extern symbol and make a tail call.
#
# $ cat test.c
# typedef int (*function)();
# extern function f;
# int g() { return f(); }
# $ cc -S -O3 -fPIC test.c && grep -A 10 '^g:' test.[sS]
# g:
# .LFB0:
# .cfi_startproc
# adrp x0, :got:f
# ldr x0, [x0, #:got_lo12:f]
# ldr x0, [x0]
# br x0
# .cfi_endproc
# .LFE0:
# .size g, .-g
# .ident "GCC: (GNU) 8.3.1 20190311 (Red Hat 8.3.1-3)"
if platform == "linux-aarch64":
asmfile.write("\tldr x16, =fdb_api_ptr_%s\n" % (f))
asmfile.write("\tldr x16, [x16]\n")
asmfile.write("\tbr x16\n")
asmfile.write("\tadrp x8, :got:fdb_api_ptr_%s\n" % (f))
asmfile.write("\tldr x8, [x8, #:got_lo12:fdb_api_ptr_%s]\n" % (f))
asmfile.write("\tldr x8, [x8]\n")
asmfile.write("\tbr x8\n")
else:
asmfile.write(
"\tmov r11, qword ptr [%sfdb_api_ptr_%s@GOTPCREL+rip]\n" % (prefix, f))

View File

@ -219,7 +219,7 @@ GetRangeResult get_range(fdb::Transaction& tr,
for (int i = 0; i < out_count; ++i) {
std::string key((const char*)out_kv[i].key, out_kv[i].key_length);
std::string value((const char*)out_kv[i].value, out_kv[i].value_length);
results.push_back(std::make_pair(key, value));
results.emplace_back(key, value);
}
return GetRangeResult{ results, out_more != 0, 0 };
}

View File

@ -0,0 +1,118 @@
/*
* unit_tests_header_510.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Unit tests for the FoundationDB C API, at api header version 510
#include "fdb_c_options.g.h"
#include <thread>
#define FDB_API_VERSION 510
static_assert(FDB_API_VERSION == 510, "Don't change this! This test intentionally tests an old api header version");
#include <foundationdb/fdb_c.h>
#define DOCTEST_CONFIG_IMPLEMENT
#include "doctest.h"
#include "flow/config.h"
void fdb_check(fdb_error_t e) {
if (e) {
std::cerr << fdb_get_error(e) << std::endl;
std::abort();
}
}
std::string clusterFilePath;
std::string prefix;
FDBDatabase* db;
struct Future {
FDBFuture* f = nullptr;
Future() = default;
explicit Future(FDBFuture* f) : f(f) {}
~Future() {
if (f)
fdb_future_destroy(f);
}
};
struct Transaction {
FDBTransaction* tr = nullptr;
Transaction() = default;
explicit Transaction(FDBTransaction* tr) : tr(tr) {}
~Transaction() {
if (tr)
fdb_transaction_destroy(tr);
}
};
// TODO add more tests. The motivation for this test for now is to test the
// assembly code that handles emulating older api versions, but there's no
// reason why this shouldn't also test api version 510 specific behavior.
TEST_CASE("GRV") {
Transaction tr;
fdb_check(fdb_database_create_transaction(db, &tr.tr));
Future grv{ fdb_transaction_get_read_version(tr.tr) };
fdb_check(fdb_future_block_until_ready(grv.f));
}
int main(int argc, char** argv) {
if (argc < 3) {
std::cout << "Unit tests for the FoundationDB C API.\n"
<< "Usage: " << argv[0] << " /path/to/cluster_file key_prefix [doctest args]" << std::endl;
return 1;
}
fdb_check(fdb_select_api_version(FDB_API_VERSION));
doctest::Context context;
context.applyCommandLine(argc, argv);
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
{
FDBCluster* cluster;
Future clusterFuture{ fdb_create_cluster(argv[1]) };
fdb_check(fdb_future_block_until_ready(clusterFuture.f));
fdb_check(fdb_future_get_cluster(clusterFuture.f, &cluster));
Future databaseFuture{ fdb_cluster_create_database(cluster, (const uint8_t*)"DB", 2) };
fdb_check(fdb_future_block_until_ready(databaseFuture.f));
fdb_check(fdb_future_get_database(databaseFuture.f, &db));
fdb_cluster_destroy(cluster);
}
clusterFilePath = std::string(argv[1]);
prefix = argv[2];
int res = context.run();
fdb_database_destroy(db);
if (context.shouldExit()) {
fdb_check(fdb_stop_network());
network_thread.join();
return res;
}
fdb_check(fdb_stop_network());
network_thread.join();
return res;
}

View File

@ -138,6 +138,11 @@ else()
add_library(fdb_java SHARED fdbJNI.cpp)
add_library(java_workloads SHARED JavaWorkload.cpp)
endif()
if (NOT WIN32 AND NOT APPLE AND NOT OPEN_FOR_IDE)
target_link_options(java_workloads PRIVATE "LINKER:--version-script=${CMAKE_SOURCE_DIR}/bindings/c/external_workload.map,-z,nodelete")
endif()
target_include_directories(fdb_java PRIVATE ${JNI_INCLUDE_DIRS})
# libfdb_java.so is loaded by fdb-java.jar and doesn't need to depened on jvm shared libraries.
target_link_libraries(fdb_java PRIVATE fdb_c)

View File

@ -17,6 +17,7 @@ Performance
* Increased performance of dr_agent when copying the mutation log. The ``COPY_LOG_BLOCK_SIZE``, ``COPY_LOG_BLOCKS_PER_TASK``, ``COPY_LOG_PREFETCH_BLOCKS``, ``COPY_LOG_READ_AHEAD_BYTES`` and ``COPY_LOG_TASK_DURATION_NANOS`` knobs can be set. `(PR #3436) <https://github.com/apple/foundationdb/pull/3436>`_
* Reduced the number of connections required by the multi-version client when loading external clients. When connecting to 7.0 clusters, only one connection with version 6.2 or larger will be used. With older clusters, at most two connections with version 6.2 or larger will be used. Clients older than version 6.2 will continue to create an additional connection each. `(PR #4667) <https://github.com/apple/foundationdb/pull/4667>`_
* Reduce CPU overhead of load balancing on client processes. `(PR #4561) <https://github.com/apple/foundationdb/pull/4561>`_
Reliability
-----------
@ -34,16 +35,21 @@ Status
* Added ``cluster.bounce_impact`` section to status to report if there will be any extra effects when bouncing the cluster, and if so, the reason for those effects. `(PR #4770) <https://github.com/apple/foundationdb/pull/4770>`_
* Added ``fetched_versions`` to the storage metrics section of status to report how fast a storage server is catching up in versions. `(PR #4770) <https://github.com/apple/foundationdb/pull/4770>`_
* Added ``fetches_from_logs`` to the storage metrics section of status to report how frequently a storage server fetches updates from transaction logs. `(PR #4770) <https://github.com/apple/foundationdb/pull/4770>`_
* Added ``seconds_since_last_recovered`` to the ``cluster.recovery_state`` section to report how long it has been since the cluster recovered to the point where it is able to accept requests. `(PR #3759) <https://github.com/apple/foundationdb/pull/3759>`_
Bindings
--------
* Python: The function ``get_estimated_range_size_bytes`` will now throw an error if the ``begin_key`` or ``end_key`` is ``None``. `(PR #3394) <https://github.com/apple/foundationdb/pull/3394>`_
* C: Added a function, ``fdb_database_reboot_worker``, to reboot or suspend the specified process. `(PR #4094) <https://github.com/apple/foundationdb/pull/4094>`_
* C: Added a function, ``fdb_database_force_recovery_with_data_loss``, to force the database to recover into the given datacenter. `(PR #4420) <https://github.com/apple/foundationdb/pull/4220>`_
* C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #) <https://github.com/apple/foundationdb/pull/4241/files>`_
* C: Added a function, ``fdb_database_create_snapshot``, to create a snapshot of the database. `(PR #4241) <https://github.com/apple/foundationdb/pull/4241>`_
* C: Added ``fdb_database_get_main_thread_busyness`` function to report how busy a client's main thread is. `(PR #4504) <https://github.com/apple/foundationdb/pull/4504>`_
* Java: Added ``Database.getMainThreadBusyness`` function to report how busy a client's main thread is. `(PR #4564) <https://github.com/apple/foundationdb/pull/4564>`_
Other Changes
-------------
* When ``fdbmonitor`` dies, all of its child processes are now killed. `(PR #3841) <https://github.com/apple/foundationdb/pull/3841>`_
* The ``foundationdb`` service installed by the RPM packages will now automatically restart ``fdbmonitor`` after 60 seconds when it fails. `(PR #3841) <https://github.com/apple/foundationdb/pull/3841>`_
Earlier release notes
---------------------

View File

@ -3357,7 +3357,7 @@ int main(int argc, char* argv[]) {
deleteData = true;
break;
case OPT_MIN_CLEANUP_SECONDS:
knobs.push_back(std::make_pair("min_cleanup_seconds", args->OptionArg()));
knobs.emplace_back("min_cleanup_seconds", args->OptionArg());
break;
case OPT_FORCE:
forceAction = true;
@ -3452,7 +3452,7 @@ int main(int argc, char* argv[]) {
return FDB_EXIT_ERROR;
}
syn = syn.substr(7);
knobs.push_back(std::make_pair(syn, args->OptionArg()));
knobs.emplace_back(syn, args->OptionArg());
break;
}
case OPT_BACKUPKEYS:
@ -4212,7 +4212,7 @@ int main(int argc, char* argv[]) {
s = s.substr(LiteralStringRef("struct ").size());
#endif
typeNames.push_back(std::make_pair(s, i->first));
typeNames.emplace_back(s, i->first);
}
std::sort(typeNames.begin(), typeNames.end());
for (int i = 0; i < typeNames.size(); i++) {

View File

@ -35,6 +35,7 @@
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/TagThrottle.h"
#include "fdbclient/Tuple.h"
#include "fdbclient/ThreadSafeTransaction.h"
#include "flow/DeterministicRandom.h"
@ -3084,7 +3085,7 @@ struct CLIOptions {
return FDB_EXIT_ERROR;
}
syn = syn.substr(7);
knobs.push_back(std::make_pair(syn, args.OptionArg()));
knobs.emplace_back(syn, args.OptionArg());
break;
}
case OPT_DEBUG_TLS:

View File

@ -57,7 +57,8 @@ set(FDBCLIENT_SRCS
SpecialKeySpace.actor.h
ReadYourWrites.actor.cpp
ReadYourWrites.h
RestoreWorkerInterface.actor.h
RestoreInterface.cpp
RestoreInterface.h
RunTransaction.actor.h
RYWIterator.cpp
RYWIterator.h

View File

@ -29,7 +29,6 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbclient/TagThrottle.h"
#include "fdbclient/GlobalConfig.h"

View File

@ -23,7 +23,7 @@
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbserver/Knobs.h"
#include "fdbclient/Knobs.h"
// The versioned message has wire format : -1, version, messages
static const int32_t VERSION_HEADER = -1;
@ -95,7 +95,7 @@ struct MutationRef {
// Amplify atomicOp size to consider such extra workload.
// A good value for FASTRESTORE_ATOMICOP_WEIGHT needs experimental evaluations.
if (isAtomicOp()) {
return totalSize() * SERVER_KNOBS->FASTRESTORE_ATOMICOP_WEIGHT;
return totalSize() * CLIENT_KNOBS->FASTRESTORE_ATOMICOP_WEIGHT;
} else {
return totalSize();
}

View File

@ -95,7 +95,7 @@ public:
if (itr != optionsIndexMap.end()) {
options.erase(itr->second);
}
options.push_back(std::make_pair(option, value));
options.emplace_back(option, value);
optionsIndexMap[option] = --options.end();
}
@ -107,4 +107,4 @@ public:
type::optionInfo.insert( \
var, FDBOptionInfo(name, comment, parameterComment, hasParameter, hidden, persistent, defaultFor));
#endif
#endif

View File

@ -483,7 +483,9 @@ inline Key keyAfter(const KeyRef& key) {
Standalone<StringRef> r;
uint8_t* s = new (r.arena()) uint8_t[key.size() + 1];
memcpy(s, key.begin(), key.size());
if (key.size() > 0) {
memcpy(s, key.begin(), key.size());
}
s[key.size()] = 0;
((StringRef&)r) = StringRef(s, key.size() + 1);
return r;

View File

@ -23,6 +23,7 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RestoreInterface.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/KeyBackedTypes.h"

View File

@ -173,6 +173,7 @@ void ClientKnobs::initialize(bool randomize) {
init( BACKUP_STATUS_DELAY, 40.0 );
init( BACKUP_STATUS_JITTER, 0.05 );
init( MIN_CLEANUP_SECONDS, 3600.0 );
init( FASTRESTORE_ATOMICOP_WEIGHT, 1 ); if( randomize && BUGGIFY ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
// Configuration
init( DEFAULT_AUTO_COMMIT_PROXIES, 3 );

View File

@ -168,6 +168,7 @@ public:
double BACKUP_STATUS_DELAY;
double BACKUP_STATUS_JITTER;
double MIN_CLEANUP_SECONDS;
int64_t FASTRESTORE_ATOMICOP_WEIGHT; // workload amplication factor for atomic op
// Configuration
int32_t DEFAULT_AUTO_COMMIT_PROXIES;

View File

@ -785,7 +785,7 @@ ConfigureAutoResult parseConfig(StatusObject const& status) {
}
if (processClass.classType() != ProcessClass::TesterClass) {
machine_processes[machineId].push_back(std::make_pair(addr, processClass));
machine_processes[machineId].emplace_back(addr, processClass);
processCount++;
}
}
@ -1315,7 +1315,7 @@ struct AutoQuorumChange final : IQuorumChange {
vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile> ccf,
CoordinatorsResult& err) override {
return getDesired(this, tr, oldCoordinators, ccf, &err);
return getDesired(Reference<AutoQuorumChange>::addRef(this), tr, oldCoordinators, ccf, &err);
}
ACTOR static Future<int> getRedundancy(AutoQuorumChange* self, Transaction* tr) {
@ -1378,7 +1378,7 @@ struct AutoQuorumChange final : IQuorumChange {
return true; // The status quo seems fine
}
ACTOR static Future<vector<NetworkAddress>> getDesired(AutoQuorumChange* self,
ACTOR static Future<vector<NetworkAddress>> getDesired(Reference<AutoQuorumChange> self,
Transaction* tr,
vector<NetworkAddress> oldCoordinators,
Reference<ClusterConnectionFile> ccf,
@ -1386,7 +1386,7 @@ struct AutoQuorumChange final : IQuorumChange {
state int desiredCount = self->desired;
if (desiredCount == -1) {
int redundancy = wait(getRedundancy(self, tr));
int redundancy = wait(getRedundancy(self.getPtr(), tr));
desiredCount = redundancy * 2 - 1;
}
@ -1415,7 +1415,7 @@ struct AutoQuorumChange final : IQuorumChange {
}
if (checkAcceptable) {
bool ok = wait(isAcceptable(self, tr, oldCoordinators, ccf, desiredCount, &excluded));
bool ok = wait(isAcceptable(self.getPtr(), tr, oldCoordinators, ccf, desiredCount, &excluded));
if (ok)
return oldCoordinators;
}

View File

@ -434,9 +434,9 @@ Optional<std::pair<LeaderInfo, bool>> getLeader(const vector<Optional<LeaderInfo
maskedNominees.reserve(nominees.size());
for (int i = 0; i < nominees.size(); i++) {
if (nominees[i].present()) {
maskedNominees.push_back(std::make_pair(
maskedNominees.emplace_back(
UID(nominees[i].get().changeID.first() & LeaderInfo::changeIDMask, nominees[i].get().changeID.second()),
i));
i);
}
}
@ -586,7 +586,7 @@ OpenDatabaseRequest ClientData::getRequest() {
auto& entry = issueMap[it];
entry.count++;
if (entry.examples.size() < CLIENT_KNOBS->CLIENT_EXAMPLE_AMOUNT) {
entry.examples.push_back(std::make_pair(ci.first, ci.second.traceLogGroup));
entry.examples.emplace_back(ci.first, ci.second.traceLogGroup);
}
}
if (ci.second.versions.size()) {
@ -597,19 +597,19 @@ OpenDatabaseRequest ClientData::getRequest() {
auto& entry = versionMap[it];
entry.count++;
if (entry.examples.size() < CLIENT_KNOBS->CLIENT_EXAMPLE_AMOUNT) {
entry.examples.push_back(std::make_pair(ci.first, ci.second.traceLogGroup));
entry.examples.emplace_back(ci.first, ci.second.traceLogGroup);
}
}
auto& maxEntry = maxProtocolMap[maxProtocol];
maxEntry.count++;
if (maxEntry.examples.size() < CLIENT_KNOBS->CLIENT_EXAMPLE_AMOUNT) {
maxEntry.examples.push_back(std::make_pair(ci.first, ci.second.traceLogGroup));
maxEntry.examples.emplace_back(ci.first, ci.second.traceLogGroup);
}
} else {
auto& entry = versionMap[ClientVersionRef()];
entry.count++;
if (entry.examples.size() < CLIENT_KNOBS->CLIENT_EXAMPLE_AMOUNT) {
entry.examples.push_back(std::make_pair(ci.first, ci.second.traceLogGroup));
entry.examples.emplace_back(ci.first, ci.second.traceLogGroup);
}
}
}

View File

@ -595,7 +595,7 @@ Reference<IDatabase> DLApi::createDatabase(const char* clusterFilePath) {
void DLApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* hookParameter) {
MutexHolder holder(lock);
threadCompletionHooks.push_back(std::make_pair(hook, hookParameter));
threadCompletionHooks.emplace_back(hook, hookParameter);
}
// MultiVersionTransaction
@ -947,7 +947,7 @@ void MultiVersionDatabase::setOption(FDBDatabaseOptions::Option option, Optional
value.castTo<Standalone<StringRef>>());
}
dbState->options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
dbState->options.emplace_back(option, value.castTo<Standalone<StringRef>>());
if (dbState->db) {
dbState->db->setOption(option, value);
@ -1559,7 +1559,7 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
runOnExternalClientsAllThreads(
[option, value](Reference<ClientInfo> client) { client->api->setNetworkOption(option, value); });
} else {
options.push_back(std::make_pair(option, value.castTo<Standalone<StringRef>>()));
options.emplace_back(option, value.castTo<Standalone<StringRef>>());
}
}
}

View File

@ -5876,3 +5876,23 @@ Future<Void> DatabaseContext::createSnapshot(StringRef uid, StringRef snapshot_c
}
return createSnapshotActor(this, UID::fromString(uid_str), snapshot_command);
}
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, bool lock_aware) {
state ReadYourWritesTransaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if(lock_aware) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
}
tr.set(perpetualStorageWiggleKey, enable ? LiteralStringRef("1") : LiteralStringRef("0"));
wait(tr.commit());
break;
}
catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}

View File

@ -407,5 +407,10 @@ ACTOR Future<bool> checkSafeExclusions(Database cx, vector<AddressExclusion> exc
inline uint64_t getWriteOperationCost(uint64_t bytes) {
return bytes / std::max(1, CLIENT_KNOBS->WRITE_COST_BYTE_FACTOR) + 1;
}
// Create a transaction to set the value of system key \xff/conf/perpetual_storage_wiggle. If enable == true, the value
// will be 1. Otherwise, the value will be 0.
ACTOR Future<Void> setPerpetualStorageWiggle(Database cx, bool enable, bool lock_aware = false);
#include "flow/unactorcompiler.h"
#endif

View File

@ -0,0 +1,56 @@
/*
* RestoreInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/RestoreInterface.h"
#include "flow/serialize.h"
const KeyRef restoreRequestDoneKey = "\xff\x02/restoreRequestDone"_sr;
const KeyRef restoreRequestTriggerKey = "\xff\x02/restoreRequestTrigger"_sr;
const KeyRangeRef restoreRequestKeys("\xff\x02/restoreRequests/"_sr, "\xff\x02/restoreRequests0"_sr);
// Encode and decode restore request value
Value restoreRequestTriggerValue(UID randomID, int numRequests) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreRequestTriggerValue()));
wr << numRequests;
wr << randomID;
return wr.toValue();
}
int decodeRestoreRequestTriggerValue(ValueRef const& value) {
int s;
UID randomID;
BinaryReader reader(value, IncludeVersion());
reader >> s;
reader >> randomID;
return s;
}
Key restoreRequestKeyFor(int index) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreRequestKeys.begin);
wr << index;
return wr.toValue();
}
Value restoreRequestValue(RestoreRequest const& request) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreRequestValue()));
wr << request;
return wr.toValue();
}

View File

@ -0,0 +1,99 @@
/*
* RestoreInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
struct RestoreCommonReply {
constexpr static FileIdentifier file_identifier = 5808787;
UID id; // unique ID of the server who sends the reply
bool isDuplicated;
RestoreCommonReply() = default;
explicit RestoreCommonReply(UID id, bool isDuplicated = false) : id(id), isDuplicated(isDuplicated) {}
std::string toString() const {
std::stringstream ss;
ss << "ServerNodeID:" << id.toString() << " isDuplicated:" << isDuplicated;
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, isDuplicated);
}
};
struct RestoreRequest {
constexpr static FileIdentifier file_identifier = 16035338;
int index;
Key tagName;
Key url;
Version targetVersion;
KeyRange range;
UID randomUid;
// Every key in backup will first removePrefix and then addPrefix;
// Simulation testing does not cover when both addPrefix and removePrefix exist yet.
Key addPrefix;
Key removePrefix;
ReplyPromise<struct RestoreCommonReply> reply;
RestoreRequest() = default;
explicit RestoreRequest(const int index,
const Key& tagName,
const Key& url,
Version targetVersion,
const KeyRange& range,
const UID& randomUid,
Key& addPrefix,
Key removePrefix)
: index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid),
addPrefix(addPrefix), removePrefix(removePrefix) {}
// To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
}
std::string toString() const {
std::stringstream ss;
ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString()
<< " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion)
<< " range:" << range.toString() << " randomUid:" << randomUid.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
return ss.str();
}
};
extern const KeyRef restoreRequestDoneKey;
extern const KeyRef restoreRequestTriggerKey;
extern const KeyRangeRef restoreRequestKeys;
Value restoreRequestTriggerValue(UID randomID, int numRequests);
int decodeRequestRequestTriggerValue(ValueRef const&);
Key restoreRequestKeyFor(int index);
Value restoreRequestValue(RestoreRequest const&);

View File

@ -960,124 +960,8 @@ const KeyRef mustContainSystemMutationsKey = LiteralStringRef("\xff/mustContainS
const KeyRangeRef monitorConfKeys(LiteralStringRef("\xff\x02/monitorConf/"), LiteralStringRef("\xff\x02/monitorConf0"));
const KeyRef restoreLeaderKey = LiteralStringRef("\xff\x02/restoreLeader");
const KeyRangeRef restoreWorkersKeys(LiteralStringRef("\xff\x02/restoreWorkers/"),
LiteralStringRef("\xff\x02/restoreWorkers0"));
const KeyRef restoreStatusKey = LiteralStringRef("\xff\x02/restoreStatus/");
const KeyRef restoreRequestTriggerKey = LiteralStringRef("\xff\x02/restoreRequestTrigger");
const KeyRef restoreRequestDoneKey = LiteralStringRef("\xff\x02/restoreRequestDone");
const KeyRangeRef restoreRequestKeys(LiteralStringRef("\xff\x02/restoreRequests/"),
LiteralStringRef("\xff\x02/restoreRequests0"));
const KeyRangeRef restoreApplierKeys(LiteralStringRef("\xff\x02/restoreApplier/"),
LiteralStringRef("\xff\x02/restoreApplier0"));
const KeyRef restoreApplierTxnValue = LiteralStringRef("1");
// restoreApplierKeys: track atomic transaction progress to ensure applying atomicOp exactly once
// Version and batchIndex are passed in as LittleEndian,
// they must be converted to BigEndian to maintain ordering in lexical order
const Key restoreApplierKeyFor(UID const& applierID, int64_t batchIndex, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreApplierKeys.begin);
wr << applierID << bigEndian64(batchIndex) << bigEndian64(version);
return wr.toValue();
}
std::tuple<UID, int64_t, Version> decodeRestoreApplierKey(ValueRef const& key) {
BinaryReader rd(key, Unversioned());
UID applierID;
int64_t batchIndex;
Version version;
rd >> applierID >> batchIndex >> version;
return std::make_tuple(applierID, bigEndian64(batchIndex), bigEndian64(version));
}
// Encode restore worker key for workerID
const Key restoreWorkerKeyFor(UID const& workerID) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreWorkersKeys.begin);
wr << workerID;
return wr.toValue();
}
// Encode restore agent value
const Value restoreWorkerInterfaceValue(RestoreWorkerInterface const& cmdInterf) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreWorkerInterfaceValue()));
wr << cmdInterf;
return wr.toValue();
}
RestoreWorkerInterface decodeRestoreWorkerInterfaceValue(ValueRef const& value) {
RestoreWorkerInterface s;
BinaryReader reader(value, IncludeVersion());
reader >> s;
return s;
}
// Encode and decode restore request value
// restoreRequestTrigger key
const Value restoreRequestTriggerValue(UID randomID, int const numRequests) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreRequestTriggerValue()));
wr << numRequests;
wr << randomID;
return wr.toValue();
}
int decodeRestoreRequestTriggerValue(ValueRef const& value) {
int s;
UID randomID;
BinaryReader reader(value, IncludeVersion());
reader >> s;
reader >> randomID;
return s;
}
// restoreRequestDone key
const Value restoreRequestDoneVersionValue(Version readVersion) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreRequestDoneVersionValue()));
wr << readVersion;
return wr.toValue();
}
Version decodeRestoreRequestDoneVersionValue(ValueRef const& value) {
Version v;
BinaryReader reader(value, IncludeVersion());
reader >> v;
return v;
}
const Key restoreRequestKeyFor(int const& index) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreRequestKeys.begin);
wr << index;
return wr.toValue();
}
const Value restoreRequestValue(RestoreRequest const& request) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreRequestValue()));
wr << request;
return wr.toValue();
}
RestoreRequest decodeRestoreRequestValue(ValueRef const& value) {
RestoreRequest s;
BinaryReader reader(value, IncludeVersion());
reader >> s;
return s;
}
// TODO: Register restore performance data to restoreStatus key
const Key restoreStatusKeyFor(StringRef statusType) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreStatusKey);
wr << statusType;
return wr.toValue();
}
const Value restoreStatusValue(double val) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreStatusValue()));
wr << StringRef(std::to_string(val));
return wr.toValue();
}
const KeyRef healthyZoneKey = LiteralStringRef("\xff\x02/healthyZone");
const StringRef ignoreSSFailuresZoneString = LiteralStringRef("IgnoreSSFailures");
const KeyRef rebalanceDDIgnoreKey = LiteralStringRef("\xff\x02/rebalanceDDIgnored");

View File

@ -26,7 +26,6 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
// Don't warn on constants being defined in this file.
#pragma clang diagnostic push
@ -447,31 +446,6 @@ extern const KeyRef mustContainSystemMutationsKey;
// Key range reserved for storing changes to monitor conf files
extern const KeyRangeRef monitorConfKeys;
// Fast restore
extern const KeyRef restoreLeaderKey;
extern const KeyRangeRef restoreWorkersKeys;
extern const KeyRef restoreStatusKey; // To be used when we measure fast restore performance
extern const KeyRef restoreRequestTriggerKey;
extern const KeyRef restoreRequestDoneKey;
extern const KeyRangeRef restoreRequestKeys;
extern const KeyRangeRef restoreApplierKeys;
extern const KeyRef restoreApplierTxnValue;
const Key restoreApplierKeyFor(UID const& applierID, int64_t batchIndex, Version version);
std::tuple<UID, int64_t, Version> decodeRestoreApplierKey(ValueRef const& key);
const Key restoreWorkerKeyFor(UID const& workerID);
const Value restoreWorkerInterfaceValue(RestoreWorkerInterface const& server);
RestoreWorkerInterface decodeRestoreWorkerInterfaceValue(ValueRef const& value);
const Value restoreRequestTriggerValue(UID randomUID, int const numRequests);
int decodeRestoreRequestTriggerValue(ValueRef const& value);
const Value restoreRequestDoneVersionValue(Version readVersion);
Version decodeRestoreRequestDoneVersionValue(ValueRef const& value);
const Key restoreRequestKeyFor(int const& index);
const Value restoreRequestValue(RestoreRequest const& server);
RestoreRequest decodeRestoreRequestValue(ValueRef const& value);
const Key restoreStatusKeyFor(StringRef statusType);
const Value restoreStatusValue(double val);
extern const KeyRef healthyZoneKey;
extern const StringRef ignoreSSFailuresZoneString;
extern const KeyRef rebalanceDDIgnoreKey;

View File

@ -194,6 +194,40 @@ struct TagThrottleInfo {
}
};
struct ClientTagThrottleLimits {
double tpsRate;
double expiration;
ClientTagThrottleLimits() : tpsRate(0), expiration(0) {}
ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {}
template <class Archive>
void serialize(Archive& ar) {
// Convert expiration time to a duration to avoid clock differences
double duration = 0;
if (!ar.isDeserializing) {
duration = expiration - now();
}
serializer(ar, tpsRate, duration);
if (ar.isDeserializing) {
expiration = now() + duration;
}
}
};
struct ClientTrCommitCostEstimation {
int opsCount = 0;
uint64_t writeCosts = 0;
std::deque<std::pair<int, uint64_t>> clearIdxCosts;
uint32_t expensiveCostEstCount = 0;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, opsCount, writeCosts, clearIdxCosts, expensiveCostEstCount);
}
};
namespace ThrottleApi {
Future<std::vector<TagThrottleInfo>> getThrottledTags(Database const& db,
int const& limit,

View File

@ -474,7 +474,7 @@ void ThreadSafeApi::addNetworkThreadCompletionHook(void (*hook)(void*), void* ho
MutexHolder holder(lock); // We could use the network thread to protect this action, but then we can't guarantee
// upon return that the hook is set.
threadCompletionHooks.push_back(std::make_pair(hook, hookParameter));
threadCompletionHooks.emplace_back(hook, hookParameter);
}
IClientApi* ThreadSafeApi::api = new ThreadSafeApi();

View File

@ -856,7 +856,7 @@ void load_conf(const char* confpath, uid_t& uid, gid_t& gid, sigset_t* mask, fdb
if (id_command[i.first]->kill_on_configuration_change) {
kill_ids.push_back(i.first);
start_ids.push_back(std::make_pair(i.first, cmd));
start_ids.emplace_back(i.first, cmd);
}
} else {
log_msg(SevInfo, "Updated configuration for %s\n", id_command[i.first]->ssection.c_str());

View File

@ -24,7 +24,7 @@
void HealthMonitor::reportPeerClosed(const NetworkAddress& peerAddress) {
purgeOutdatedHistory();
peerClosedHistory.push_back(std::make_pair(now(), peerAddress));
peerClosedHistory.emplace_back(now(), peerAddress);
peerClosedNum[peerAddress] += 1;
}

View File

@ -20,6 +20,7 @@
#ifndef FDBRPC_STATS_H
#define FDBRPC_STATS_H
#include <type_traits>
#pragma once
// Yet another performance statistics interface
@ -136,7 +137,15 @@ struct SpecialCounter final : ICounter, FastAllocated<SpecialCounter<F>>, NonCop
void remove() override { delete this; }
std::string const& getName() const override { return name; }
int64_t getValue() const override { return f(); }
int64_t getValue() const override {
auto result = f();
// Disallow conversion from floating point to int64_t, since this has
// been a source of confusion - e.g. a percentage represented as a
// fraction between 0 and 1 is not meaningful after conversion to
// int64_t.
static_assert(!std::is_floating_point_v<decltype(result)>);
return result;
}
void resetInterval() override {}

View File

@ -25,6 +25,7 @@
#include "fdbclient/SystemData.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/BackupProgress.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/ServerDBInfo.h"

View File

@ -83,6 +83,8 @@ set(FDBSERVER_SRCS
RestoreLoader.actor.cpp
RestoreWorker.actor.h
RestoreWorker.actor.cpp
RestoreWorkerInterface.actor.cpp
RestoreWorkerInterface.actor.h
Resolver.actor.cpp
ResolverInterface.h
ServerDBInfo.actor.h

View File

@ -599,8 +599,8 @@ public:
std::vector<std::tuple<ProcessClass::Fitness, int, bool, int, Field>> orderedFields;
for (auto& it : fieldsWithMin) {
auto& fitness = field_fitness[it];
orderedFields.push_back(std::make_tuple(
std::get<0>(fitness), std::get<1>(fitness), std::get<2>(fitness), field_count[it], it));
orderedFields.emplace_back(
std::get<0>(fitness), std::get<1>(fitness), std::get<2>(fitness), field_count[it], it);
}
std::sort(orderedFields.begin(), orderedFields.end());
int totalFields = desired / minPerField;
@ -1692,20 +1692,37 @@ public:
if (req.configuration.regions.size() > 1) {
std::vector<RegionInfo> regions = req.configuration.regions;
if (regions[0].priority == regions[1].priority && regions[1].dcId == clusterControllerDcId.get()) {
TraceEvent("CCSwitchPrimaryDc", id)
.detail("CCDcId", clusterControllerDcId.get())
.detail("OldPrimaryDcId", regions[0].dcId)
.detail("NewPrimaryDcId", regions[1].dcId);
std::swap(regions[0], regions[1]);
}
if (regions[1].dcId == clusterControllerDcId.get() &&
(!versionDifferenceUpdated || datacenterVersionDifference >= SERVER_KNOBS->MAX_VERSION_DIFFERENCE)) {
if (regions[1].priority >= 0) {
TraceEvent("CCSwitchPrimaryDcVersionDifference", id)
.detail("CCDcId", clusterControllerDcId.get())
.detail("OldPrimaryDcId", regions[0].dcId)
.detail("NewPrimaryDcId", regions[1].dcId);
std::swap(regions[0], regions[1]);
} else {
TraceEvent(SevWarnAlways, "CCDcPriorityNegative")
.detail("DcId", regions[1].dcId)
.detail("Priority", regions[1].priority);
.detail("Priority", regions[1].priority)
.detail("FindWorkersInDc", regions[0].dcId)
.detail("Warning", "Failover did not happen but CC is in remote DC");
}
}
TraceEvent("CCFindWorkersForConfiguration", id)
.detail("CCDcId", clusterControllerDcId.get())
.detail("Region0DcId", regions[0].dcId)
.detail("Region1DcId", regions[1].dcId)
.detail("DatacenterVersionDifference", datacenterVersionDifference)
.detail("VersionDifferenceUpdated", versionDifferenceUpdated);
bool setPrimaryDesired = false;
try {
auto reply = findWorkersForConfigurationFromDC(req, regions[0].dcId);
@ -1719,6 +1736,10 @@ public:
} else if (regions[0].dcId == clusterControllerDcId.get()) {
return reply.get();
}
TraceEvent(SevWarn, "CCRecruitmentFailed", id)
.detail("Reason", "Recruited Txn system and CC are in different DCs")
.detail("CCDcId", clusterControllerDcId.get())
.detail("RecruitedTxnSystemDcId", regions[0].dcId);
throw no_more_servers();
} catch (Error& e) {
if (!goodRemoteRecruitmentTime.isReady() && regions[1].dcId != clusterControllerDcId.get()) {
@ -1728,7 +1749,9 @@ public:
if (e.code() != error_code_no_more_servers || regions[1].priority < 0) {
throw;
}
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDC", id).error(e);
TraceEvent(SevWarn, "AttemptingRecruitmentInRemoteDc", id)
.detail("SetPrimaryDesired", setPrimaryDesired)
.error(e);
auto reply = findWorkersForConfigurationFromDC(req, regions[1].dcId);
if (!setPrimaryDesired) {
vector<Optional<Key>> dcPriority;

View File

@ -42,6 +42,7 @@
#include "fdbserver/ProxyCommitData.actor.h"
#include "fdbserver/RatekeeperInterface.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
@ -1460,7 +1461,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
ssis.push_back(it->interf);
maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id());
}
rep.results.push_back(std::make_pair(r.range(), ssis));
rep.results.emplace_back(r.range(), ssis);
} else if (!req.reverse) {
int count = 0;
for (auto r = commitData->keyInfo.rangeContaining(req.begin);
@ -1472,7 +1473,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
ssis.push_back(it->interf);
maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id());
}
rep.results.push_back(std::make_pair(r.range(), ssis));
rep.results.emplace_back(r.range(), ssis);
count++;
}
} else {
@ -1485,7 +1486,7 @@ ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsReques
ssis.push_back(it->interf);
maybeAddTssMapping(rep, commitData, tssMappingsIncluded, it->interf.id());
}
rep.results.push_back(std::make_pair(r.range(), ssis));
rep.results.emplace_back(r.range(), ssis);
if (r == commitData->keyInfo.ranges().begin()) {
break;
}

View File

@ -51,9 +51,11 @@ class TCMachineTeamInfo;
ACTOR Future<Void> checkAndRemoveInvalidLocalityAddr(DDTeamCollection* self);
ACTOR Future<Void> removeWrongStoreType(DDTeamCollection* self);
ACTOR Future<Void> waitForAllDataRemoved(Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams);
struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
UID id;
Version addedVersion; // Read version when this Server is added
DDTeamCollection* collection;
StorageServerInterface lastKnownInterface;
ProcessClass lastKnownClass;
@ -82,10 +84,13 @@ struct TCServerInfo : public ReferenceCounted<TCServerInfo> {
DDTeamCollection* collection,
ProcessClass processClass,
bool inDesiredDC,
Reference<LocalitySet> storageServerSet)
Reference<LocalitySet> storageServerSet,
Version addedVersion = 0)
: id(ssi.id()), collection(collection), lastKnownInterface(ssi), lastKnownClass(processClass),
dataInFlightToServer(0), onInterfaceChanged(interfaceChanged.getFuture()), onRemoved(removed.getFuture()),
inDesiredDC(inDesiredDC), storeType(KeyValueStoreType::END), onTSSPairRemoved(Never()) {
inDesiredDC(inDesiredDC), storeType(KeyValueStoreType::END), onTSSPairRemoved(Never()),
addedVersion(addedVersion) {
if (!ssi.isTss()) {
localityEntry = ((LocalityMap<UID>*)storageServerSet.getPtr())->add(ssi.locality, &id);
}
@ -362,15 +367,17 @@ private:
};
struct ServerStatus {
bool isWiggling;
bool isFailed;
bool isUndesired;
bool isWrongConfiguration;
bool initialized; // AsyncMap erases default constructed objects
LocalityData locality;
ServerStatus() : isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus()
: isWiggling(false), isFailed(true), isUndesired(false), isWrongConfiguration(false), initialized(false) {}
ServerStatus(bool isFailed, bool isUndesired, LocalityData const& locality)
: isFailed(isFailed), isUndesired(isUndesired), locality(locality), isWrongConfiguration(false),
initialized(true) {}
initialized(true), isWiggling(false) {}
bool isUnhealthy() const { return isFailed || isUndesired; }
const char* toString() const { return isFailed ? "Failed" : isUndesired ? "Undesired" : "Healthy"; }
@ -453,10 +460,10 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution(Data
for (int i = 0; i < serverList.get().size(); i++) {
auto ssi = decodeServerListValue(serverList.get()[i].value);
if (!ssi.isTss()) {
result->allServers.push_back(std::make_pair(ssi, id_data[ssi.locality.processId()].processClass));
result->allServers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
server_dc[ssi.id()] = ssi.locality.dcId();
} else {
tss_servers.push_back(std::make_pair(ssi, id_data[ssi.locality.processId()].processClass));
tss_servers.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
}
}
@ -592,7 +599,7 @@ Future<Void> teamTracker(struct DDTeamCollection* const& self,
struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// clang-format off
enum { REQUESTING_WORKER = 0, GETTING_WORKER = 1, GETTING_STORAGE = 2 };
enum class Status { NONE = 0, EXCLUDED = 1, FAILED = 2 };
enum class Status { NONE = 0, WIGGLING = 1, EXCLUDED = 2, FAILED = 3};
// addActor: add to actorCollection so that when an actor has error, the ActorCollection can catch the error.
// addActor is used to create the actorCollection when the dataDistributionTeamCollection is created
@ -613,10 +620,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int64_t unhealthyServers;
std::map<int,int> priority_teams;
std::map<UID, Reference<TCServerInfo>> server_info;
std::map<Key, std::vector<Reference<TCServerInfo>>> pid2server_info; // some process may serve as multiple storage servers
std::map<UID, Reference<TCServerInfo>> tss_info_by_pair;
std::map<UID, Reference<TCServerInfo>> server_and_tss_info; // TODO could replace this with an efficient way to do a read-only concatenation of 2 data structures?
std::map<Key, int> lagging_zones; // zone to number of storage servers lagging
AsyncVar<bool> disableFailingLaggingServers;
Optional<Key> wigglingPid; // Process id of current wiggling storage server;
// machine_info has all machines info; key must be unique across processes on the same machine
std::map<Standalone<StringRef>, Reference<TCMachineInfo>> machine_info;
@ -644,6 +653,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
bool isTssRecruiting; // If tss recruiting is waiting on a pair, don't consider DD recruiting for the purposes of QuietDB
// WIGGLING if an address is under storage wiggling.
// EXCLUDED if an address is in the excluded list in the database.
// FAILED if an address is permanently failed.
// NONE by default. Updated asynchronously (eventually)
@ -678,6 +688,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
AsyncTrigger printDetailedTeamsInfo;
PromiseStream<GetMetricsRequest> getShardMetrics;
PromiseStream<Promise<int>> getUnhealthyRelocationCount;
Promise<UID> removeFailedServer;
void resetLocalitySet() {
@ -717,7 +728,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
bool primary,
Reference<AsyncVar<bool>> processingUnhealthy,
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer)
Promise<UID> removeFailedServer,
PromiseStream<Promise<int>> getUnhealthyRelocationCount)
: cx(cx), distributorId(distributorId), lock(lock), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), doBuildTeams(true), lastBuildTeamsFailed(false),
teamBuilder(Void()), badTeamRemover(Void()), checkInvalidLocalities(Void()), wrongStoreTypeRemover(Void()),
@ -732,7 +744,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
zeroHealthyTeams(zeroHealthyTeams), zeroOptimalTeams(true), primary(primary), isTssRecruiting(false),
medianAvailableSpace(SERVER_KNOBS->MIN_AVAILABLE_SPACE_RATIO), lastMedianAvailableSpaceUpdate(0),
processingUnhealthy(processingUnhealthy), lowestUtilizationTeam(0), highestUtilizationTeam(0),
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer) {
getShardMetrics(getShardMetrics), removeFailedServer(removeFailedServer),
getUnhealthyRelocationCount(getUnhealthyRelocationCount) {
if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId).detail("State", "Inactive").trackLatest("DDTrackerStarting");
}
@ -2458,7 +2471,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
processClass,
includedDCs.empty() ||
std::find(includedDCs.begin(), includedDCs.end(), newServer.locality.dcId()) != includedDCs.end(),
storageServerSet);
storageServerSet,
addedVersion);
if (newServer.isTss()) {
tss_info_by_pair[newServer.tssPairID.get()] = r;
@ -2470,6 +2484,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
server_info[newServer.id()] = r;
// Establish the relation between server and machine
checkAndCreateMachine(r);
// Add storage server to pid map
ASSERT(r->lastKnownInterface.locality.processId().present());
StringRef pid = r->lastKnownInterface.locality.processId().get();
pid2server_info[pid].push_back(r);
}
r->tracker =
@ -2668,6 +2686,19 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// ASSERT( !shardsAffectedByTeamFailure->getServersForTeam( t ) for all t in teams that contain removedServer )
Reference<TCServerInfo> removedServerInfo = server_info[removedServer];
// Step: Remove TCServerInfo from pid2server_info
ASSERT(removedServerInfo->lastKnownInterface.locality.processId().present());
StringRef pid = removedServerInfo->lastKnownInterface.locality.processId().get();
auto& info_vec = pid2server_info[pid];
for (size_t i = 0; i < info_vec.size(); ++i) {
if (info_vec[i] == removedServerInfo) {
info_vec[i--] = info_vec.back();
info_vec.pop_back();
}
}
if (info_vec.size() == 0) {
pid2server_info.erase(pid);
}
// Step: Remove server team that relate to removedServer
// Find all servers with which the removedServer shares teams
@ -2782,6 +2813,47 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
.detail("MachineTeams", machineTeams.size())
.detail("DesiredTeamsPerServer", SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER);
}
// Adds storage servers held on process of which the Process Id is “pid” into excludeServers which prevent
// recruiting the wiggling storage servers and let teamTracker start to move data off the affected teams;
// Return a vector of futures wait for all data is moved to other teams.
std::vector<Future<Void>> excludeStorageServersForWiggle(const Value& pid) {
std::vector<Future<Void>> moveFutures;
if (this->pid2server_info.count(pid) != 0) {
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (this->excludedServers.count(addr) &&
this->excludedServers.get(addr) != DDTeamCollection::Status::NONE) {
continue; // don't overwrite the value set by actor trackExcludedServer
}
this->excludedServers.set(addr, DDTeamCollection::Status::WIGGLING);
moveFutures.push_back(
waitForAllDataRemoved(this->cx, info->lastKnownInterface.id(), info->addedVersion, this));
}
if (!moveFutures.empty()) {
this->restartRecruiting.trigger();
}
}
return moveFutures;
}
// Include storage servers held on process of which the Process Id is “pid” by setting their status from `WIGGLING`
// to `NONE`. The storage recruiter will recruit them as new storage servers
void includeStorageServersForWiggle(const Value& pid) {
bool included = false;
for (auto& info : this->pid2server_info[pid]) {
AddressExclusion addr(info->lastKnownInterface.address().ip);
if (!this->excludedServers.count(addr) ||
this->excludedServers.get(addr) != DDTeamCollection::Status::WIGGLING) {
continue;
}
included = true;
this->excludedServers.set(addr, DDTeamCollection::Status::NONE);
}
if (included) {
this->restartRecruiting.trigger();
}
}
};
TCServerInfo::~TCServerInfo() {
@ -3390,6 +3462,7 @@ bool teamContainsFailedServer(DDTeamCollection* self, Reference<TCTeamInfo> team
ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> team, bool badTeam, bool redundantTeam) {
state int lastServersLeft = team->size();
state bool lastAnyUndesired = false;
state bool lastAnyWigglingServer = false;
state bool logTeamEvents =
g_network->isSimulated() || !badTeam || team->size() <= self->configuration.storageTeamSize;
state bool lastReady = false;
@ -3423,6 +3496,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
state vector<Future<Void>> change;
bool anyUndesired = false;
bool anyWrongConfiguration = false;
bool anyWigglingServer = false;
int serversLeft = 0;
for (const UID& uid : team->getServerIDs()) {
@ -3437,6 +3511,9 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
if (status.isWrongConfiguration) {
anyWrongConfiguration = true;
}
if (status.isWiggling) {
anyWigglingServer = true;
}
}
if (serversLeft == 0) {
@ -3454,7 +3531,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
change.push_back(self->zeroHealthyTeams->onChange());
bool healthy = !badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize;
bool healthy =
!badTeam && !anyUndesired && serversLeft == self->configuration.storageTeamSize && !anyWigglingServer;
team->setHealthy(healthy); // Unhealthy teams won't be chosen by bestTeam
bool optimal = team->isOptimal() && healthy;
bool containsFailed = teamContainsFailedServer(self, team);
@ -3493,13 +3571,15 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
if (serversLeft != lastServersLeft || anyUndesired != lastAnyUndesired ||
anyWrongConfiguration != lastWrongConfiguration || recheck) { // NOTE: do not check wrongSize
anyWrongConfiguration != lastWrongConfiguration || anyWigglingServer != lastAnyWigglingServer ||
recheck) { // NOTE: do not check wrongSize
if (logTeamEvents) {
TraceEvent("ServerTeamHealthChanged", self->distributorId)
.detail("ServerTeam", team->getDesc())
.detail("ServersLeft", serversLeft)
.detail("LastServersLeft", lastServersLeft)
.detail("ContainsUndesiredServer", anyUndesired)
.detail("ContainsWigglingServer", anyWigglingServer)
.detail("HealthyTeamsCount", self->healthyTeamCount)
.detail("IsWrongConfiguration", anyWrongConfiguration);
}
@ -3541,6 +3621,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
lastServersLeft = serversLeft;
lastAnyUndesired = anyUndesired;
lastWrongConfiguration = anyWrongConfiguration;
lastAnyWigglingServer = anyWigglingServer;
state int lastPriority = team->getPriority();
if (team->size() == 0) {
@ -3562,6 +3643,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
}
} else if (anyUndesired) {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER);
} else if (anyWigglingServer) {
team->setPriority(SERVER_KNOBS->PRIORITY_PERPETUAL_STORAGE_WIGGLE);
} else {
team->setPriority(SERVER_KNOBS->PRIORITY_TEAM_HEALTHY);
}
@ -3593,6 +3676,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
lastZeroHealthy =
self->zeroHealthyTeams->get(); // set this again in case it changed from this teams health changing
if ((self->initialFailureReactionDelay.isReady() && !self->zeroHealthyTeams->get()) || containsFailed) {
vector<KeyRange> shards = self->shardsAffectedByTeamFailure->getShardsFor(
ShardsAffectedByTeamFailure::Team(team->getServerIDs(), self->primary));
@ -3790,12 +3874,254 @@ ACTOR Future<vector<std::pair<StorageServerInterface, ProcessClass>>> getServerL
vector<std::pair<StorageServerInterface, ProcessClass>> results;
for (int i = 0; i < serverList.get().size(); i++) {
auto ssi = decodeServerListValue(serverList.get()[i].value);
results.push_back(std::make_pair(ssi, id_data[ssi.locality.processId()].processClass));
results.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
}
return results;
}
// Create a transaction reading the value of `wigglingStorageServerKey` and update it to the next Process ID according
// to a sorted PID set maintained by the data distributor. If now no storage server exists, the new Process ID is 0.
ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection) {
state ReadYourWritesTransaction tr(teamCollection->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> value = wait(tr.get(wigglingStorageServerKey));
if (teamCollection->pid2server_info.empty()) {
tr.set(wigglingStorageServerKey, LiteralStringRef("0"));
} else {
Value pid = teamCollection->pid2server_info.begin()->first;
if (value.present()) {
auto nextIt = teamCollection->pid2server_info.upper_bound(value.get());
if (nextIt == teamCollection->pid2server_info.end()) {
tr.set(wigglingStorageServerKey, pid);
} else {
tr.set(wigglingStorageServerKey, nextIt->first);
}
} else {
tr.set(wigglingStorageServerKey, pid);
}
}
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return Void();
}
// Iterate over each storage process to do storage wiggle. After initializing the first Process ID, it waits a signal
// from `perpetualStorageWiggler` indicating the wiggling of current process is finished. Then it writes the next
// Process ID to a system key: `wigglingStorageServerKey` to show the next process to wiggle.
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncTrigger* stopSignal,
FutureStream<Void> finishStorageWiggleSignal,
DDTeamCollection* teamCollection) {
// initialize PID
wait(updateNextWigglingStoragePID(teamCollection));
loop choose {
when(wait(stopSignal->onTrigger())) { break; }
when(waitNext(finishStorageWiggleSignal)) { wait(updateNextWigglingStoragePID(teamCollection)); }
}
return Void();
}
// Watch the value change of `wigglingStorageServerKey`.
// Return the watch future and the current value of `wigglingStorageServerKey`.
ACTOR Future<std::pair<Future<Void>, Value>> watchPerpetualStoragePIDChange(Database cx) {
state ReadYourWritesTransaction tr(cx);
state Future<Void> watchFuture;
state Value ret;
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Value> value = wait(tr.get(wigglingStorageServerKey));
if (value.present()) {
ret = value.get();
}
watchFuture = tr.watch(wigglingStorageServerKey);
wait(tr.commit());
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
return std::make_pair(watchFuture, ret);
}
// Watches the value (pid) change of \xff/storageWigglePID, and adds storage servers held on process of which the
// Process Id is “pid” into excludeServers which prevent recruiting the wiggling storage servers and let teamTracker
// start to move data off the affected teams. The wiggling process of current storage servers will be paused if the
// cluster is unhealthy and restarted once the cluster is healthy again.
ACTOR Future<Void> perpetualStorageWiggler(AsyncTrigger* stopSignal,
PromiseStream<Void> finishStorageWiggleSignal,
DDTeamCollection* self,
const DDEnabledState* ddEnabledState) {
state Future<Void> watchFuture;
state Future<Void> moveFinishFuture = Never();
state Debouncer pauseWiggle(SERVER_KNOBS->DEBOUNCE_RECRUITING_DELAY);
state AsyncTrigger restart;
state Future<Void> ddQueueCheck =
delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
state int movingCount = 0;
state bool isPaused = false;
state std::pair<Future<Void>, Value> res = wait(watchPerpetualStoragePIDChange(self->cx));
watchFuture = res.first;
self->wigglingPid = Optional<Key>(res.second);
// start with the initial pid
if (self->healthyTeamCount > 1) { // pre-check health status
TEST(true); // start the first wiggling
auto fv = self->excludeStorageServersForWiggle(self->wigglingPid.get());
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleInitialStart", self->distributorId)
.detail("ProcessId", self->wigglingPid.get())
.detail("StorageCount", movingCount);
} else {
isPaused = true;
TraceEvent("PerpetualStorageWiggleInitialPause", self->distributorId)
.detail("ProcessId", self->wigglingPid.get());
}
loop {
choose {
when(wait(stopSignal->onTrigger())) { break; }
when(wait(watchFuture)) {
// read new pid and set the next watch Future
wait(store(res, watchPerpetualStoragePIDChange(self->cx)));
watchFuture = res.first;
self->wigglingPid = Optional<Key>(res.second);
StringRef pid = self->wigglingPid.get();
if (self->healthyTeamCount <= 1) { // pre-check health status
pauseWiggle.trigger();
} else {
TEST(true); // start wiggling
auto fv = self->excludeStorageServersForWiggle(pid);
movingCount = fv.size();
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleStart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
}
}
when(wait(restart.onTrigger())) {
if (self->wigglingPid.present()) {
TEST(true); // restart paused wiggling
StringRef pid = self->wigglingPid.get();
auto fv = self->excludeStorageServersForWiggle(pid);
moveFinishFuture = waitForAll(fv);
TraceEvent("PerpetualStorageWiggleRestart", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", fv.size());
isPaused = false;
}
}
when(wait(moveFinishFuture)) {
TEST(true); // finish wiggling this process
ASSERT(self->wigglingPid.present());
StringRef pid = self->wigglingPid.get();
moveFinishFuture = Never();
self->includeStorageServersForWiggle(pid);
TraceEvent("PerpetualStorageWiggleFinish", self->distributorId)
.detail("ProcessId", pid.toString())
.detail("StorageCount", movingCount);
self->wigglingPid.reset();
finishStorageWiggleSignal.send(Void());
}
when(wait(self->zeroHealthyTeams->onChange())) {
if (self->zeroHealthyTeams->get() && !isPaused) {
pauseWiggle.trigger();
}
}
when(wait(ddQueueCheck)) { // check health status periodically
Promise<int> countp;
self->getUnhealthyRelocationCount.send(countp);
int count = wait(countp.getFuture());
if (count >= SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && !isPaused) {
pauseWiggle.trigger();
} else if (count < SERVER_KNOBS->DD_STORAGE_WIGGLE_PAUSE_THRESHOLD && self->healthyTeamCount > 1 &&
isPaused) {
restart.trigger();
}
ddQueueCheck = delay(SERVER_KNOBS->DD_ZERO_HEALTHY_TEAM_DELAY, TaskPriority::DataDistributionLow);
}
when(wait(pauseWiggle.onTrigger())) {
if (self->wigglingPid.present()) {
TEST(true); // paused because cluster is unhealthy
StringRef pid = self->wigglingPid.get();
isPaused = true;
moveFinishFuture = Never();
self->includeStorageServersForWiggle(pid);
TraceEvent("PerpetualStorageWigglePause", self->distributorId)
.detail("ProcessId", pid)
.detail("StorageCount", movingCount);
}
}
}
}
if (self->wigglingPid.present()) {
self->includeStorageServersForWiggle(self->wigglingPid.get());
self->wigglingPid.reset();
}
return Void();
}
// This coroutine sets a watch to monitor the value change of `perpetualStorageWiggleKey` which is controlled by command
// `configure perpetual_storage_wiggle=$value` if the value is 1, this actor start 2 actors,
// `perpetualStorageWiggleIterator` and `perpetualStorageWiggler`. Otherwise, it sends stop signal to them.
ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection,
const DDEnabledState* ddEnabledState) {
state int speed = 0;
state AsyncTrigger stopWiggleSignal;
state PromiseStream<Void> finishStorageWiggleSignal;
state SignalableActorCollection collection;
loop {
state ReadYourWritesTransaction tr(teamCollection->cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<Standalone<StringRef>> value = wait(tr.get(perpetualStorageWiggleKey));
if (value.present()) {
speed = std::stoi(value.get().toString());
}
state Future<Void> watchFuture = tr.watch(perpetualStorageWiggleKey);
wait(tr.commit());
ASSERT(speed == 1 || speed == 0);
if (speed == 1) {
collection.add(perpetualStorageWiggleIterator(
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
collection.add(perpetualStorageWiggler(
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId);
} else {
stopWiggleSignal.trigger();
wait(collection.signalAndReset());
TraceEvent("PerpetualStorageWiggleClose", teamCollection->distributorId);
}
wait(watchFuture);
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
}
// The serverList system keyspace keeps the StorageServerInterface for each serverID. Storage server's storeType
// and serverID are decided by the server's filename. By parsing storage server file's filename on each disk, process on
// each machine creates the TCServer with the correct serverID and StorageServerInterface.
@ -4212,7 +4538,13 @@ ACTOR Future<Void> storageServerTracker(
.detail("Excluded", worstAddr.toString());
status.isUndesired = true;
status.isWrongConfiguration = true;
if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) {
if (worstStatus == DDTeamCollection::Status::WIGGLING && !isTss) {
status.isWiggling = true;
TraceEvent("PerpetualWigglingStorageServer", self->distributorId)
.detail("Server", server->id)
.detail("Address", worstAddr.toString());
} else if (worstStatus == DDTeamCollection::Status::FAILED && !isTss) {
TraceEvent(SevWarn, "FailedServerRemoveKeys", self->distributorId)
.detail("Server", server->id)
.detail("Excluded", worstAddr.toString());
@ -5130,6 +5462,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> te
self->addActor.send(trackExcludedServers(self));
self->addActor.send(monitorHealthyTeams(self));
self->addActor.send(waitHealthyZoneChange(self));
self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState));
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
@ -5493,6 +5826,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
state PromiseStream<RelocateShard> output;
state PromiseStream<RelocateShard> input;
state PromiseStream<Promise<int64_t>> getAverageShardBytes;
state PromiseStream<Promise<int>> getUnhealthyRelocationCount;
state PromiseStream<GetMetricsRequest> getShardMetrics;
state Reference<AsyncVar<bool>> processingUnhealthy(new AsyncVar<bool>(false));
state Promise<Void> readyToStart;
@ -5576,6 +5910,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
shardsAffectedByTeamFailure,
lock,
getAverageShardBytes,
getUnhealthyRelocationCount,
self->ddId,
storageTeamSize,
configuration.storageTeamSize,
@ -5600,7 +5935,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
true,
processingUnhealthy,
getShardMetrics,
removeFailedServer);
removeFailedServer,
getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
remoteTeamCollection =
@ -5617,7 +5953,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
false,
processingUnhealthy,
getShardMetrics,
removeFailedServer);
removeFailedServer,
getUnhealthyRelocationCount);
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back(
@ -6091,7 +6428,8 @@ std::unique_ptr<DDTeamCollection> testTeamCollection(int teamSize,
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
Promise<UID>(),
PromiseStream<Promise<int>>()));
for (int id = 1; id <= processCount; ++id) {
UID uid(id, 0);
@ -6133,7 +6471,8 @@ std::unique_ptr<DDTeamCollection> testMachineTeamCollection(int teamSize,
true,
makeReference<AsyncVar<bool>>(false),
PromiseStream<GetMetricsRequest>(),
Promise<UID>()));
Promise<UID>(),
PromiseStream<Promise<int>>()));
for (int id = 1; id <= processCount; id++) {
UID uid(id, 0);
@ -6310,4 +6649,4 @@ TEST_CASE("/DataDistribution/AddTeamsBestOf/NotEnoughServers") {
ASSERT(result == 8);
return Void();
}
}

View File

@ -263,6 +263,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
PromiseStream<Promise<int>> getUnhealthyRelocationCount,
UID distributorId,
int teamSize,
int singleRegionTeamSize,

View File

@ -1032,7 +1032,7 @@ ACTOR Future<Void> dataDistributionRelocator(DDQueueData* self, RelocateData rd,
anyWithSource = true;
}
bestTeams.push_back(std::make_pair(bestTeam.first.get(), bestTeam.second));
bestTeams.emplace_back(bestTeam.first.get(), bestTeam.second);
tciIndex++;
}
if (foundTeams && anyHealthy) {
@ -1550,6 +1550,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
PromiseStream<Promise<int>> getUnhealthyRelocationCount,
UID distributorId,
int teamSize,
int singleRegionTeamSize,
@ -1679,6 +1680,9 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
}
when(wait(self.error.getFuture())) {} // Propagate errors from dataDistributionRelocator
when(wait(waitForAll(balancingFutures))) {}
when(Promise<int> r = waitNext(getUnhealthyRelocationCount.getFuture())) {
r.send(self.unhealthyRelocations);
}
}
}
} catch (Error& e) {

View File

@ -176,8 +176,8 @@ ShardSizeBounds getShardSizeBounds(KeyRangeRef shard, int64_t maxShardSize) {
}
int64_t getMaxShardSize(double dbSizeEstimate) {
return std::min((SERVER_KNOBS->MIN_SHARD_BYTES +
(int64_t)std::sqrt(dbSizeEstimate) * SERVER_KNOBS->SHARD_BYTES_PER_SQRT_BYTES) *
return std::min((SERVER_KNOBS->MIN_SHARD_BYTES + (int64_t)std::sqrt(std::max<double>(dbSizeEstimate, 0)) *
SERVER_KNOBS->SHARD_BYTES_PER_SQRT_BYTES) *
SERVER_KNOBS->SHARD_BYTES_RATIO,
(int64_t)SERVER_KNOBS->MAX_SHARD_BYTES);
}

View File

@ -832,7 +832,7 @@ public:
int count = end - begin;
numItems = count;
nodeBytesDeleted = 0;
initialHeight = (uint8_t)log2(count) + 1;
initialHeight = count ? (uint8_t)log2(count) + 1 : 0;
maxHeight = 0;
// The boundary leading to the new page acts as the last time we branched right

View File

@ -109,15 +109,18 @@ struct GrvProxyStats {
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemAndDefaultTxnRateAllowed", [this]() { return this->transactionRateAllowed; });
specialCounter(cc, "BatchTransactionRateAllowed", [this]() { return this->batchTransactionRateAllowed; });
specialCounter(cc, "SystemAndDefaultTxnLimit", [this]() { return this->transactionLimit; });
specialCounter(cc, "BatchTransactionLimit", [this]() { return this->batchTransactionLimit; });
specialCounter(cc, "PercentageOfDefaultGRVQueueProcessed", [this]() {
return this->percentageOfDefaultGRVQueueProcessed;
});
specialCounter(
cc, "PercentageOfBatchGRVQueueProcessed", [this]() { return this->percentageOfBatchGRVQueueProcessed; });
cc, "SystemAndDefaultTxnRateAllowed", [this]() { return int64_t(this->transactionRateAllowed); });
specialCounter(
cc, "BatchTransactionRateAllowed", [this]() { return int64_t(this->batchTransactionRateAllowed); });
specialCounter(cc, "SystemAndDefaultTxnLimit", [this]() { return int64_t(this->transactionLimit); });
specialCounter(cc, "BatchTransactionLimit", [this]() { return int64_t(this->batchTransactionLimit); });
specialCounter(cc, "PercentageOfDefaultGRVQueueProcessed", [this]() {
return int64_t(100 * this->percentageOfDefaultGRVQueueProcessed);
});
specialCounter(cc, "PercentageOfBatchGRVQueueProcessed", [this]() {
return int64_t(100 * this->percentageOfBatchGRVQueueProcessed);
});
logger = traceCounters("GrvProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "GrvProxyMetrics");
for (int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
@ -831,8 +834,10 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
}
span = Span(span.location);
grvProxyData->stats.percentageOfDefaultGRVQueueProcessed = (double)defaultGRVProcessed / defaultQueueSize;
grvProxyData->stats.percentageOfBatchGRVQueueProcessed = (double)batchGRVProcessed / batchQueueSize;
grvProxyData->stats.percentageOfDefaultGRVQueueProcessed =
defaultQueueSize ? (double)defaultGRVProcessed / defaultQueueSize : 1;
grvProxyData->stats.percentageOfBatchGRVQueueProcessed =
batchQueueSize ? (double)batchGRVProcessed / batchQueueSize : 1;
}
}

View File

@ -401,7 +401,7 @@ private:
if (o->op == OpSet) {
if (sequential) {
KeyValueMapPair pair(o->p1, o->p2);
dataSets.push_back(std::make_pair(pair, pair.arena.getSize() + data.getElementBytes()));
dataSets.emplace_back(pair, pair.arena.getSize() + data.getElementBytes());
} else {
data.insert(o->p1, o->p2);
}

View File

@ -131,6 +131,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
init( PRIORITY_REBALANCE_OVERUTILIZED_TEAM, 121 );
init( PRIORITY_PERPETUAL_STORAGE_WIGGLE, 140 );
init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 );
@ -253,6 +254,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( DD_TEAMS_INFO_PRINT_INTERVAL, 60 ); if( randomize && BUGGIFY ) DD_TEAMS_INFO_PRINT_INTERVAL = 10;
init( DD_TEAMS_INFO_PRINT_YIELD_COUNT, 100 ); if( randomize && BUGGIFY ) DD_TEAMS_INFO_PRINT_YIELD_COUNT = deterministicRandom()->random01() * 1000 + 1;
init( DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY, 120 ); if( randomize && BUGGIFY ) DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY = 5;
init( DD_STORAGE_WIGGLE_PAUSE_THRESHOLD, 1 ); if( randomize && BUGGIFY ) DD_STORAGE_WIGGLE_PAUSE_THRESHOLD = 10;
// TeamRemover
init( TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER, false ); if( randomize && BUGGIFY ) TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER = deterministicRandom()->random01() < 0.1 ? true : false; // false by default. disable the consistency check when it's true

View File

@ -133,6 +133,7 @@ public:
int PRIORITY_RECOVER_MOVE;
int PRIORITY_REBALANCE_UNDERUTILIZED_TEAM;
int PRIORITY_REBALANCE_OVERUTILIZED_TEAM;
int PRIORITY_PERPETUAL_STORAGE_WIGGLE;
int PRIORITY_TEAM_HEALTHY;
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
int PRIORITY_TEAM_REDUNDANT;
@ -203,6 +204,7 @@ public:
int DD_TEAMS_INFO_PRINT_INTERVAL;
int DD_TEAMS_INFO_PRINT_YIELD_COUNT;
int DD_TEAM_ZERO_SERVER_LEFT_LOG_DELAY;
int DD_STORAGE_WIGGLE_PAUSE_THRESHOLD; // How many unhealthy relocations are ongoing will pause storage wiggle
// TeamRemover to remove redundant teams
bool TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER; // disable the machineTeamRemover actor

View File

@ -21,6 +21,7 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Locality.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/Knobs.h"
#include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -175,22 +175,22 @@ struct LogRouterData {
specialCounter(cc, "WaitForVersionMS", [this]() {
double val = this->waitForVersionTime;
this->waitForVersionTime = 0;
return 1000 * val;
return int64_t(1000 * val);
});
specialCounter(cc, "WaitForVersionMaxMS", [this]() {
double val = this->maxWaitForVersionTime;
this->maxWaitForVersionTime = 0;
return 1000 * val;
return int64_t(1000 * val);
});
specialCounter(cc, "GetMoreMS", [this]() {
double val = this->getMoreTime;
this->getMoreTime = 0;
return 1000 * val;
return int64_t(1000 * val);
});
specialCounter(cc, "GetMoreMaxMS", [this]() {
double val = this->maxGetMoreTime;
this->maxGetMoreTime = 0;
return 1000 * val;
return int64_t(1000 * val);
});
specialCounter(cc, "Generation", [this]() { return this->generation; });
logger = traceCounters("LogRouterMetrics",

View File

@ -838,7 +838,7 @@ void commitMessages(Reference<LogData> self,
TEST(true); // Splitting commit messages across multiple blocks
messages1 = StringRef(block.end(), bytes);
block.append(block.arena(), messages.begin(), bytes);
self->messageBlocks.push_back(std::make_pair(version, block));
self->messageBlocks.emplace_back(version, block);
addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR;
messages = messages.substr(bytes);
}
@ -851,7 +851,7 @@ void commitMessages(Reference<LogData> self,
// Copy messages into block
ASSERT(messages.size() <= block.capacity() - block.size());
block.append(block.arena(), messages.begin(), messages.size());
self->messageBlocks.push_back(std::make_pair(version, block));
self->messageBlocks.emplace_back(version, block);
addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR;
messages = StringRef(block.end() - messages.size(), messages.size());
@ -869,7 +869,7 @@ void commitMessages(Reference<LogData> self,
int offs = tag->messageOffsets[m];
uint8_t const* p =
offs < messages1.size() ? messages1.begin() + offs : messages.begin() + offs - messages1.size();
tsm->value.version_messages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)p)));
tsm->value.version_messages.emplace_back(version, LengthPrefixedStringRef((uint32_t*)p));
if (tsm->value.version_messages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
TraceEvent(SevWarnAlways, "LargeMessage")
.detail("Size", tsm->value.version_messages.back().second.expectedSize());

View File

@ -26,6 +26,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
@ -598,6 +599,10 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
if (g_network->isSimulated())
wait(delay(5.0));
// The quiet database check (which runs at the end of every test) will always time out due to active data movement.
// To get around this, quiet Database will disable the perpetual wiggle in the setup phase.
wait(setPerpetualStorageWiggle(cx, false, true));
// Require 3 consecutive successful quiet database checks spaced 2 second apart
state int numSuccesses = 0;

View File

@ -54,7 +54,9 @@ StringRef radix_join(const StringRef& key1, const StringRef& key2, Arena& arena)
uint8_t* s = new (arena) uint8_t[rsize];
memcpy(s, key1.begin(), key1.size());
memcpy(s + key1.size(), key2.begin(), key2.size());
if (key2.size() > 0) {
memcpy(s + key1.size(), key2.begin(), key2.size());
}
return StringRef(s, rsize);
}
@ -591,7 +593,9 @@ StringRef radix_tree::iterator::getKey(uint8_t* content) const {
auto node = m_pointee;
uint32_t pos = m_pointee->m_depth;
while (true) {
memcpy(content + pos, node->getKey().begin(), node->getKeySize());
if (node->getKeySize() > 0) {
memcpy(content + pos, node->getKey().begin(), node->getKeySize());
}
node = node->m_parent;
if (node == nullptr || pos <= 0)
break;

View File

@ -21,6 +21,7 @@
#ifndef FDBSERVER_RATEKEEPERINTERFACE_H
#define FDBSERVER_RATEKEEPERINTERFACE_H
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
@ -49,29 +50,6 @@ struct RatekeeperInterface {
}
};
struct ClientTagThrottleLimits {
double tpsRate;
double expiration;
ClientTagThrottleLimits() : tpsRate(0), expiration(0) {}
ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {}
template <class Archive>
void serialize(Archive& ar) {
// Convert expiration time to a duration to avoid clock differences
double duration = 0;
if (!ar.isDeserializing) {
duration = expiration - now();
}
serializer(ar, tpsRate, duration);
if (ar.isDeserializing) {
expiration = now() + duration;
}
}
};
struct TransactionCommitCostEstimation {
int opsSum = 0;
uint64_t costSum = 0;
@ -91,17 +69,6 @@ struct TransactionCommitCostEstimation {
}
};
struct ClientTrCommitCostEstimation {
int opsCount = 0;
uint64_t writeCosts = 0;
std::deque<std::pair<int, uint64_t>> clearIdxCosts;
uint32_t expensiveCostEstCount = 0;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, opsCount, writeCosts, clearIdxCosts, expensiveCostEstCount);
}
};
struct GetRateInfoReply {
constexpr static FileIdentifier file_identifier = 7845006;
double transactionRate;

View File

@ -233,7 +233,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
self->resolvedStateBytes += stateBytes;
if (stateBytes > 0)
self->recentStateTransactionSizes.push_back(std::make_pair(req.version, stateBytes));
self->recentStateTransactionSizes.emplace_back(req.version, stateBytes);
ASSERT(req.version >= firstUnseenVersion);
ASSERT(firstUnseenVersion >= self->debugMinRecentStateVersion);

View File

@ -35,10 +35,10 @@
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/MutationTracking.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -35,6 +35,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // has to be last include
@ -394,4 +395,4 @@ Future<Void> sendBatchRequests(RequestStream<Request> Interface::*channel,
}
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_RESTORECOMMON_ACTOR_H
#endif // FDBSERVER_RESTORECOMMON_ACTOR_H

View File

@ -34,10 +34,10 @@
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbrpc/Locality.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreCommon.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbclient/BackupContainer.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -37,7 +37,7 @@
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreUtil.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -28,6 +28,7 @@
#include "fdbclient/Tuple.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/RestoreInterface.h"
#include "flow/flow.h"
#include "fdbrpc/TimedRequest.h"
#include "fdbrpc/fdbrpc.h"
@ -88,26 +89,6 @@ std::string getHexString(StringRef input);
bool debugFRMutation(const char* context, Version version, MutationRef const& mutation);
struct RestoreCommonReply {
constexpr static FileIdentifier file_identifier = 5808787;
UID id; // unique ID of the server who sends the reply
bool isDuplicated;
RestoreCommonReply() = default;
explicit RestoreCommonReply(UID id, bool isDuplicated = false) : id(id), isDuplicated(isDuplicated) {}
std::string toString() const {
std::stringstream ss;
ss << "ServerNodeID:" << id.toString() << " isDuplicated:" << isDuplicated;
return ss.str();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, isDuplicated);
}
};
struct RestoreSimpleRequest : TimedRequest {
constexpr static FileIdentifier file_identifier = 16448937;

View File

@ -189,7 +189,7 @@ ACTOR Future<Void> monitorWorkerLiveness(Reference<RestoreWorkerData> self) {
loop {
std::vector<std::pair<UID, RestoreSimpleRequest>> requests;
for (auto& worker : self->workerInterfaces) {
requests.push_back(std::make_pair(worker.first, RestoreSimpleRequest()));
requests.emplace_back(worker.first, RestoreSimpleRequest());
}
wait(sendBatchRequests(&RestoreWorkerInterface::heartbeat, self->workerInterfaces, requests));
wait(delay(60.0));

View File

@ -33,12 +33,12 @@
#include <cstdint>
#include <cstdarg>
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreCommon.actor.h"
#include "fdbserver/RestoreRoleCommon.actor.h"
#include "fdbserver/RestoreLoader.actor.h"
#include "fdbserver/RestoreApplier.actor.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
// Each restore worker (a process) is assigned for a role.
// MAYBE Later: We will support multiple restore roles on a worker

View File

@ -0,0 +1,102 @@
/*
* RestoreWorkerInterface.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // must be last include
const KeyRef restoreLeaderKey = "\xff\x02/restoreLeader"_sr;
const KeyRangeRef restoreWorkersKeys("\xff\x02/restoreWorkers/"_sr, "\xff\x02/restoreWorkers0"_sr);
const KeyRef restoreStatusKey = "\xff\x02/restoreStatus/"_sr;
const KeyRangeRef restoreApplierKeys("\xff\x02/restoreApplier/"_sr, "\xff\x02/restoreApplier0"_sr);
const KeyRef restoreApplierTxnValue = "1"_sr;
// restoreApplierKeys: track atomic transaction progress to ensure applying atomicOp exactly once
// Version and batchIndex are passed in as LittleEndian,
// they must be converted to BigEndian to maintain ordering in lexical order
const Key restoreApplierKeyFor(UID const& applierID, int64_t batchIndex, Version version) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreApplierKeys.begin);
wr << applierID << bigEndian64(batchIndex) << bigEndian64(version);
return wr.toValue();
}
std::tuple<UID, int64_t, Version> decodeRestoreApplierKey(ValueRef const& key) {
BinaryReader rd(key, Unversioned());
UID applierID;
int64_t batchIndex;
Version version;
rd >> applierID >> batchIndex >> version;
return std::make_tuple(applierID, bigEndian64(batchIndex), bigEndian64(version));
}
// Encode restore worker key for workerID
const Key restoreWorkerKeyFor(UID const& workerID) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreWorkersKeys.begin);
wr << workerID;
return wr.toValue();
}
// Encode restore agent value
const Value restoreWorkerInterfaceValue(RestoreWorkerInterface const& cmdInterf) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreWorkerInterfaceValue()));
wr << cmdInterf;
return wr.toValue();
}
RestoreWorkerInterface decodeRestoreWorkerInterfaceValue(ValueRef const& value) {
RestoreWorkerInterface s;
BinaryReader reader(value, IncludeVersion());
reader >> s;
return s;
}
Value restoreRequestDoneVersionValue(Version readVersion) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreRequestDoneVersionValue()));
wr << readVersion;
return wr.toValue();
}
Version decodeRestoreRequestDoneVersionValue(ValueRef const& value) {
Version v;
BinaryReader reader(value, IncludeVersion());
reader >> v;
return v;
}
RestoreRequest decodeRestoreRequestValue(ValueRef const& value) {
RestoreRequest s;
BinaryReader reader(value, IncludeVersion());
reader >> s;
return s;
}
// TODO: Register restore performance data to restoreStatus key
const Key restoreStatusKeyFor(StringRef statusType) {
BinaryWriter wr(Unversioned());
wr.serializeBytes(restoreStatusKey);
wr << statusType;
return wr.toValue();
}
const Value restoreStatusValue(double val) {
BinaryWriter wr(IncludeVersion(ProtocolVersion::withRestoreStatusValue()));
wr << StringRef(std::to_string(val));
return wr.toValue();
}

View File

@ -22,11 +22,11 @@
// which are RestoreController, RestoreLoader, and RestoreApplier
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_RESTORE_WORKER_INTERFACE_ACTOR_G_H)
#define FDBCLIENT_RESTORE_WORKER_INTERFACE_ACTOR_G_H
#include "fdbclient/RestoreWorkerInterface.actor.g.h"
#elif !defined(FDBCLIENT_RESTORE_WORKER_INTERFACE_ACTOR_H)
#define FDBCLIENT_RESTORE_WORKER_INTERFACE_ACTOR_H
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_RESTORE_WORKER_INTERFACE_ACTOR_G_H)
#define FDBSERVER_RESTORE_WORKER_INTERFACE_ACTOR_G_H
#include "fdbserver/RestoreWorkerInterface.actor.g.h"
#elif !defined(FDBSERVER_RESTORE_WORKER_INTERFACE_ACTOR_H)
#define FDBSERVER_RESTORE_WORKER_INTERFACE_ACTOR_H
#include <sstream>
#include <string>
@ -707,57 +707,29 @@ struct RestoreUpdateRateRequest : TimedRequest {
}
};
struct RestoreRequest {
constexpr static FileIdentifier file_identifier = 16035338;
int index;
Key tagName;
Key url;
Version targetVersion;
KeyRange range;
UID randomUid;
// Every key in backup will first removePrefix and then addPrefix;
// Simulation testing does not cover when both addPrefix and removePrefix exist yet.
Key addPrefix;
Key removePrefix;
ReplyPromise<struct RestoreCommonReply> reply;
RestoreRequest() = default;
explicit RestoreRequest(const int index,
const Key& tagName,
const Key& url,
Version targetVersion,
const KeyRange& range,
const UID& randomUid,
Key& addPrefix,
Key removePrefix)
: index(index), tagName(tagName), url(url), targetVersion(targetVersion), range(range), randomUid(randomUid),
addPrefix(addPrefix), removePrefix(removePrefix) {}
// To change this serialization, ProtocolVersion::RestoreRequestValue must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, index, tagName, url, targetVersion, range, randomUid, addPrefix, removePrefix, reply);
}
std::string toString() const {
std::stringstream ss;
ss << "index:" << std::to_string(index) << " tagName:" << tagName.contents().toString()
<< " url:" << url.contents().toString() << " targetVersion:" << std::to_string(targetVersion)
<< " range:" << range.toString() << " randomUid:" << randomUid.toString()
<< " addPrefix:" << addPrefix.toString() << " removePrefix:" << removePrefix.toString();
return ss.str();
}
};
std::string getRoleStr(RestoreRole role);
////--- Interface functions
ACTOR Future<Void> _restoreWorker(Database cx, LocalityData locality);
ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> ccf, LocalityData locality, std::string coordFolder);
extern const KeyRef restoreLeaderKey;
extern const KeyRangeRef restoreWorkersKeys;
extern const KeyRef restoreStatusKey; // To be used when we measure fast restore performance
extern const KeyRangeRef restoreRequestKeys;
extern const KeyRangeRef restoreApplierKeys;
extern const KeyRef restoreApplierTxnValue;
const Key restoreApplierKeyFor(UID const& applierID, int64_t batchIndex, Version version);
std::tuple<UID, int64_t, Version> decodeRestoreApplierKey(ValueRef const& key);
const Key restoreWorkerKeyFor(UID const& workerID);
const Value restoreWorkerInterfaceValue(RestoreWorkerInterface const& server);
RestoreWorkerInterface decodeRestoreWorkerInterfaceValue(ValueRef const& value);
Version decodeRestoreRequestDoneVersionValue(ValueRef const& value);
RestoreRequest decodeRestoreRequestValue(ValueRef const& value);
const Key restoreStatusKeyFor(StringRef statusType);
const Value restoreStatusValue(double val);
Value restoreRequestDoneVersionValue(Version readVersion);
#include "flow/unactorcompiler.h"
#endif

View File

@ -1203,6 +1203,13 @@ void SimulationConfig::generateNormalConfig(const TestConfig& testConfig) {
// }
// set_config("memory");
// set_config("memory-radixtree-beta");
if (deterministicRandom()->random01() < 0.5) {
set_config("perpetual_storage_wiggle=0");
} else {
set_config("perpetual_storage_wiggle=1");
}
// set_config("perpetual_storage_wiggle=1");
if (testConfig.simpleConfig) {
db.desiredTLogCount = 1;
db.commitProxyCount = 1;

View File

@ -355,8 +355,10 @@ public:
// pre: !finished()
force_inline void prefetch() {
Node* next = x->getNext(level - 1);
_mm_prefetch((const char*)next, _MM_HINT_T0);
_mm_prefetch((const char*)next + 64, _MM_HINT_T0);
if (next) {
_mm_prefetch((const char*)next, _MM_HINT_T0);
_mm_prefetch((const char*)next + 64, _MM_HINT_T0);
}
}
// pre: !finished()

View File

@ -31,6 +31,7 @@
#include "flow/UnitTest.h"
#include "fdbserver/QuietDatabase.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/Knobs.h"
#include "fdbclient/JsonBuilder.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -1806,7 +1807,7 @@ static Future<vector<std::pair<iface, EventMap>>> getServerMetrics(
++futureItr;
}
results.push_back(std::make_pair(servers[i], serverResults));
results.emplace_back(servers[i], serverResults);
}
return results;

View File

@ -112,7 +112,7 @@ struct TransientStorageMetricSample : StorageMetricSample {
int64_t addAndExpire(KeyRef key, int64_t metric, double expiration) {
int64_t x = add(key, metric);
if (x)
queue.push_back(std::make_pair(expiration, std::make_pair(*sample.find(key), -x)));
queue.emplace_back(expiration, std::make_pair(*sample.find(key), -x));
return x;
}

View File

@ -27,6 +27,7 @@
#include "fdbrpc/simulator.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/LogProtocolMessage.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -2117,7 +2118,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<Reference<AsyncVar<bool>>> failed;
for (const auto& logVar : logServers.back()->logServers) {
allLogServers.push_back(std::make_pair(logVar, coreSet.tLogPolicy));
allLogServers.emplace_back(logVar, coreSet.tLogPolicy);
failed.push_back(makeReference<AsyncVar<bool>>());
failureTrackers.push_back(monitorLog(logVar, failed.back()));
}
@ -2129,7 +2130,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for (const auto& logSet : oldLogData.back().tLogs) {
for (const auto& logVar : logSet->logServers) {
allLogServers.push_back(std::make_pair(logVar, logSet->tLogPolicy));
allLogServers.emplace_back(logVar, logSet->tLogPolicy);
}
}
}

View File

@ -1004,7 +1004,7 @@ struct RedwoodMetrics {
if (*m.first == '\0') {
*s += "\n";
} else if (!skipZeroes || m.second != 0) {
*s += format("%-15s %-8u %8u/s ", m.first, m.second, int(m.second / elapsed));
*s += format("%-15s %-8u %8" PRId64 "/s ", m.first, m.second, int64_t(m.second / elapsed));
}
}
}
@ -2871,7 +2871,8 @@ struct RedwoodRecordRef {
case 1:
return *(int32_t*)r;
case 2:
return (((int64_t)((int48_t*)r)->high) << 16) | (((int48_t*)r)->low & 0xFFFF);
return ((int64_t) static_cast<uint32_t>(reinterpret_cast<const int48_t*>(r)->high) << 16) |
(((int48_t*)r)->low & 0xFFFF);
case 3:
default:
return *(int64_t*)r;

View File

@ -36,7 +36,6 @@
#include <boost/interprocess/managed_shared_memory.hpp>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/versions.h"
#include "fdbclient/BuildFlags.h"
@ -52,6 +51,7 @@
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/MoveKeys.actor.h"
#include "fdbserver/NetworkTest.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/SimulatedCluster.h"
#include "fdbserver/Status.h"
@ -1050,7 +1050,7 @@ private:
flushAndExit(FDB_EXIT_ERROR);
}
syn = syn.substr(7);
knobs.push_back(std::make_pair(syn, args.OptionArg()));
knobs.emplace_back(syn, args.OptionArg());
break;
}
case OPT_UNITTESTPARAM: {
@ -1359,10 +1359,10 @@ private:
}
// SOMEDAY: ideally we'd have some better way to express that a knob should be elevated to formal
// parameter
knobs.push_back(std::make_pair(
knobs.emplace_back(
"page_cache_4k",
format("%ld", ti.get() / 4096 * 4096))); // The cache holds 4K pages, so we can truncate this to the
// next smaller multiple of 4K.
format("%ld", ti.get() / 4096 * 4096)); // The cache holds 4K pages, so we can truncate this to the
// next smaller multiple of 4K.
break;
case OPT_BUGGIFY:
if (!strcmp(args.OptionArg(), "on"))
@ -2143,7 +2143,7 @@ int main(int argc, char* argv[]) {
s = s.substr(LiteralStringRef("struct ").size());
#endif
typeNames.push_back(std::make_pair(s, i->first));
typeNames.emplace_back(s, i->first);
}
std::sort(typeNames.begin(), typeNames.end());
for (int i = 0; i < typeNames.size(); i++) {

View File

@ -711,15 +711,10 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything(Referen
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::recruiting_transaction_servers)
.detail("Status", RecoveryStatus::names[RecoveryStatus::recruiting_transaction_servers])
.detail("RequiredTLogs", self->configuration.tLogReplicationFactor)
.detail("DesiredTLogs", self->configuration.getDesiredLogs())
.detail("Conf", self->configuration.toString())
.detail("RequiredCommitProxies", 1)
.detail("DesiredCommitProxies", self->configuration.getDesiredCommitProxies())
.detail("RequiredGrvProxies", 1)
.detail("DesiredGrvProxies", self->configuration.getDesiredGrvProxies())
.detail("RequiredResolvers", 1)
.detail("DesiredResolvers", self->configuration.getDesiredResolvers())
.detail("StoreType", self->configuration.storageServerStoreType)
.trackLatest("MasterRecoveryState");
// FIXME: we only need log routers for the same locality as the master
@ -732,14 +727,25 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything(Referen
wait(brokenPromiseToNever(self->clusterController.recruitFromConfiguration.getReply(
RecruitFromConfigurationRequest(self->configuration, self->lastEpochEnd == 0, maxLogRouters))));
std::string primaryDcIds, remoteDcIds;
self->primaryDcId.clear();
self->remoteDcIds.clear();
if (recruits.dcId.present()) {
self->primaryDcId.push_back(recruits.dcId);
if (!primaryDcIds.empty()) {
primaryDcIds += ',';
}
primaryDcIds += printable(recruits.dcId);
if (self->configuration.regions.size() > 1) {
self->remoteDcIds.push_back(recruits.dcId.get() == self->configuration.regions[0].dcId
? self->configuration.regions[1].dcId
: self->configuration.regions[0].dcId);
Key remoteDcId = recruits.dcId.get() == self->configuration.regions[0].dcId
? self->configuration.regions[1].dcId
: self->configuration.regions[0].dcId;
self->remoteDcIds.push_back(remoteDcId);
if (!remoteDcIds.empty()) {
remoteDcIds += ',';
}
remoteDcIds += printable(remoteDcId);
}
}
self->backupWorkers.swap(recruits.backupWorkers);
@ -755,6 +761,8 @@ ACTOR Future<vector<Standalone<CommitTransactionRef>>> recruitEverything(Referen
.detail("OldLogRouters", recruits.oldLogRouters.size())
.detail("StorageServers", recruits.storageServers.size())
.detail("BackupWorkers", self->backupWorkers.size())
.detail("PrimaryDcIds", primaryDcIds)
.detail("RemoteDcIds", remoteDcIds)
.trackLatest("MasterRecoveryState");
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand

View File

@ -43143,7 +43143,7 @@ SQLITE_PRIVATE void sqlite3VdbeMakeReady(
p->pFree = sqlite3DbMallocZero(db, nByte);
}
zCsr = p->pFree;
zEnd = &zCsr[nByte];
zEnd = zCsr ? &zCsr[nByte] : NULL;
}while( nByte && !db->mallocFailed );
p->nCursor = (u16)nCursor;

View File

@ -729,7 +729,7 @@ public:
specialCounter(cc, "DurableVersion", [self]() { return self->durableVersion.get(); });
specialCounter(cc, "DesiredOldestVersion", [self]() { return self->desiredOldestVersion.get(); });
specialCounter(cc, "VersionLag", [self]() { return self->versionLag; });
specialCounter(cc, "LocalRate", [self] { return self->currentRate() * 100; });
specialCounter(cc, "LocalRate", [self] { return int64_t(self->currentRate() * 100); });
specialCounter(cc, "BytesReadSampleCount", [self]() { return self->metrics.bytesReadSample.queue.size(); });

View File

@ -39,6 +39,7 @@
#include "fdbclient/MonitorLeader.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -22,7 +22,7 @@
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbserver/RestoreCommon.actor.h"
#include "fdbserver/workloads/workloads.actor.h"

View File

@ -1,8 +1,29 @@
/*
* ClientTransactionProfileCorrectness.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbclient/Tuple.h"
#include "flow/actorcompiler.h" // has to be last include
static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");

View File

@ -22,6 +22,7 @@
#include "fdbserver/TesterInterface.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/RunTransaction.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -23,7 +23,7 @@
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreWorkerInterface.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// A workload which test the correctness of backup and restore process

View File

@ -27,6 +27,7 @@
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/IRandom.h"

View File

@ -20,6 +20,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/TagThrottle.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"

View File

@ -556,7 +556,7 @@ private:
LogEvent(EVENTLOG_INFORMATION_TYPE,
format("Found new configuration for process (ID %d)", sp->id));
stop_processes.push_back(sp);
start_ids.push_back(std::make_pair(sp->id, cmd));
start_ids.emplace_back(sp->id, cmd);
} else if (cmd.quiet != sp->command.quiet || cmd.restartDelay != sp->command.restartDelay) {
// Update restartDelay and quiet but do not restart running processes
if (!cmd.quiet || !sp->command.quiet)
@ -585,7 +585,7 @@ private:
std::string section(it->pItem, dot - it->pItem);
Command cmd = makeCommand(ini, section, id);
if (cmd.valid) {
start_ids.push_back(std::make_pair(id, cmd));
start_ids.emplace_back(id, cmd);
} else {
LogEvent(
EVENTLOG_ERROR_TYPE,

View File

@ -444,8 +444,18 @@ public:
StringRef substr(int start) const { return StringRef(data + start, length - start); }
StringRef substr(int start, int size) const { return StringRef(data + start, size); }
bool startsWith(const StringRef& s) const { return size() >= s.size() && !memcmp(begin(), s.begin(), s.size()); }
bool startsWith(const StringRef& s) const {
// Avoid UB - can't pass nullptr to memcmp
if (s.size() == 0) {
return true;
}
return size() >= s.size() && !memcmp(begin(), s.begin(), s.size());
}
bool endsWith(const StringRef& s) const {
// Avoid UB - can't pass nullptr to memcmp
if (s.size() == 0) {
return true;
}
return size() >= s.size() && !memcmp(end() - s.size(), s.begin(), s.size());
}
@ -782,6 +792,7 @@ struct VectorRefPreserializer {
void invalidate() {}
void add(const T& item) {}
void remove(const T& item) {}
void reset() {}
};
template <class T>
@ -813,6 +824,7 @@ struct VectorRefPreserializer<T, VecSerStrategy::String> {
_cached_size -= _string_traits.getSize(item);
}
}
void reset() { _cached_size = 0; }
};
template <class T, VecSerStrategy SerStrategy = VecSerStrategy::FlatBuffers>
@ -1031,6 +1043,11 @@ public:
m_size = size;
}
void clear() {
VPS::reset();
m_size = 0;
}
void reserve(Arena& p, int size) {
if (size > m_capacity)
reallocate(p, size);

View File

@ -119,7 +119,7 @@ void setFastAllocatorThreadInitFunction(ThreadInitFunction f) {
std::atomic<int64_t> g_hugeArenaMemory(0);
double hugeArenaLastLogged = 0;
std::map<std::string, std::pair<int, int>> hugeArenaTraces;
std::map<std::string, std::pair<int, int64_t>> hugeArenaTraces;
void hugeArenaSample(int size) {
if (TraceEvent::isNetworkThread()) {
@ -564,7 +564,7 @@ void FastAllocator<Size>::releaseThreadMagazines() {
if (thr.freelist || thr.alternate) {
if (thr.freelist) {
ASSERT(thr.count > 0 && thr.count <= magazine_size);
globalData()->partial_magazines.push_back(std::make_pair(thr.count, thr.freelist));
globalData()->partial_magazines.emplace_back(thr.count, thr.freelist);
globalData()->partialMagazineUnallocatedMemory += thr.count * Size;
}
if (thr.alternate) {

View File

@ -28,6 +28,7 @@
// either we pull g_simulator into flow, or flow (and the I/O path) will be unable to log performance
// metrics.
#include <fdbrpc/simulator.h>
#include <limits>
// pull in some global pointers too: These types are implemented in fdbrpc/sim2.actor.cpp, which is not available here.
// Yuck. If you're not using the simulator, these will remain null, and all should be well.
@ -117,7 +118,7 @@ void Histogram::writeToLog() {
e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper.at(unit));
for (uint32_t i = 0; i < 32; i++) {
uint32_t value = ((uint32_t)1) << (i + 1);
uint64_t value = uint64_t(1) << (i + 1);
if (buckets[i]) {
switch (unit) {

View File

@ -2998,7 +2998,7 @@ void outOfMemory() {
else if (StringRef(s).startsWith(LiteralStringRef("struct ")))
s = s.substr(LiteralStringRef("struct ").size());
#endif
typeNames.push_back(std::make_pair(s, i->first));
typeNames.emplace_back(s, i->first);
}
std::sort(typeNames.begin(), typeNames.end());
for (int i = 0; i < typeNames.size(); i++) {

View File

@ -285,7 +285,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState* sta
else if (StringRef(s).startsWith(LiteralStringRef("struct ")))
s = s.substr(LiteralStringRef("struct ").size());
#endif
typeNames.push_back(std::make_pair(s, i->first));
typeNames.emplace_back(s, i->first);
}
std::sort(typeNames.begin(), typeNames.end());
for (int i = 0; i < typeNames.size(); i++) {

View File

@ -881,7 +881,7 @@ struct EventMetric final : E, ReferenceCounted<EventMetric<E>>, MetricUtil<Event
time.flushField(mk, rollTime, batch);
flushFields(typename Descriptor<E>::field_indexes(), mk, rollTime, batch);
if (!latestRecorded) {
batch.updates.push_back(std::make_pair(mk.packLatestKey(), StringRef()));
batch.updates.emplace_back(mk.packLatestKey(), StringRef());
latestRecorded = true;
}
}
@ -1249,7 +1249,7 @@ public:
void flushData(const MetricKeyRef& mk, uint64_t rollTime, MetricUpdateBatch& batch) override {
if (!recorded) {
batch.updates.push_back(std::make_pair(mk.packLatestKey(), getLatestAsValue()));
batch.updates.emplace_back(mk.packLatestKey(), getLatestAsValue());
recorded = true;
}

View File

@ -198,7 +198,7 @@ void DynamicEventMetric::flushData(MetricKeyRef const& mk, uint64_t rollTime, Me
for (auto& [name, field] : fields)
field->flushField(mk, rollTime, batch);
if (!latestRecorded) {
batch.updates.push_back(std::make_pair(mk.packLatestKey(), StringRef()));
batch.updates.emplace_back(mk.packLatestKey(), StringRef());
latestRecorded = true;
}
}

View File

@ -20,6 +20,7 @@
#include "benchmark/benchmark.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/SystemData.h"