Merge branch 'main' of github.com:apple/foundationdb into jfu-mako-active-tenants

This commit is contained in:
Jon Fu 2022-06-16 10:32:20 -07:00
commit 9bfc6ee59e
9 changed files with 98 additions and 176 deletions

View File

@ -88,7 +88,7 @@ Transaction createNewTransaction(Database db, Arguments const& args, int id = -1
}
// Create Tenant Transaction
int tenant_id = (id == -1) ? urand(0, args.active_tenants - 1) : id;
// If provided tenants array (only necessary in runWorkload), use it
// If provided tenants array, use it
if (tenants) {
return tenants[tenant_id].createTransaction();
}
@ -168,10 +168,7 @@ int populate(Database db,
int thread_id,
int thread_tps,
ThreadStatistics& stats) {
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
auto xacts = 0;
auto keystr = ByteString{};
auto valstr = ByteString{};
keystr.resize(args.key_length);
@ -182,7 +179,6 @@ int populate(Database db,
auto watch_throttle = Stopwatch(watch_total.getStart());
auto watch_tx = Stopwatch(watch_total.getStart());
auto watch_trace = Stopwatch(watch_total.getStart());
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
Transaction systemTx = db.createTransaction();
for (int i = 0; i < args.total_tenants; ++i) {
@ -205,77 +201,92 @@ int populate(Database db,
}
}
}
Transaction tx = createNewTransaction(db, args);
for (auto i = key_begin; i <= key_end; i++) {
/* sequential keys */
genKey(keystr.data(), KEY_PREFIX, args, i);
/* random values */
randomString(valstr.data(), args.value_length);
while (thread_tps > 0 && xacts >= thread_tps /* throttle */) {
if (toIntegerSeconds(watch_throttle.stop().diff()) >= 1) {
xacts = 0;
watch_throttle.startFromStop();
} else {
usleep(1000);
}
}
if (num_seconds_trace_every) {
if (toIntegerSeconds(watch_trace.stop().diff()) >= num_seconds_trace_every) {
watch_trace.startFromStop();
logr.debug("txn tracing {}", toCharsRef(keystr));
auto err = Error{};
err = tx.setOptionNothrow(FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER, keystr);
if (err) {
logr.error("setOption(TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER): {}", err.what());
}
err = tx.setOptionNothrow(FDB_TR_OPTION_LOG_TRANSACTION, BytesRef());
if (err) {
logr.error("setOption(TR_OPTION_LOG_TRANSACTION): {}", err.what());
}
}
}
/* insert (SET) */
tx.set(keystr, valstr);
stats.incrOpCount(OP_INSERT);
/* commit every 100 inserts (default) or if this is the last key */
if ((i % num_commit_every == 0) || i == key_end) {
const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0;
auto watch_commit = Stopwatch(StartAtCtor{});
auto future_commit = tx.commit();
const auto rc = waitAndHandleError(tx, future_commit, "COMMIT_POPULATE_INSERT");
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
auto tx_restarter = ExitGuard([&watch_tx]() { watch_tx.startFromStop(); });
if (rc == FutureRC::OK) {
key_checkpoint = i + 1; // restart on failures from next key
tx = createNewTransaction(db, args);
} else if (rc == FutureRC::ABORT) {
return -1;
} else {
i = key_checkpoint - 1; // restart from last committed
continue;
}
/* xact latency stats */
if (do_sample) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
xacts++; /* for throttling */
}
// mimic typical tenant usage: keep tenants in memory
// and create transactions as needed
Tenant tenants[args.active_tenants];
for (int i = 0; i < args.active_tenants; ++i) {
std::string tenantStr = "tenant" + std::to_string(i);
BytesRef tenant_name = toBytesRef(tenantStr);
tenants[i] = db.openTenant(tenant_name);
}
int populate_iters = args.active_tenants > 0 ? args.active_tenants : 1;
// Each tenant should have the same range populated
for (auto t_id = 0; t_id < populate_iters; ++t_id) {
Transaction tx = createNewTransaction(db, args, t_id, args.active_tenants > 0 ? tenants : nullptr);
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
for (auto i = key_begin; i <= key_end; i++) {
/* sequential keys */
genKey(keystr.data(), KEY_PREFIX, args, i);
/* random values */
randomString(valstr.data(), args.value_length);
while (thread_tps > 0 && xacts >= thread_tps /* throttle */) {
if (toIntegerSeconds(watch_throttle.stop().diff()) >= 1) {
xacts = 0;
watch_throttle.startFromStop();
} else {
usleep(1000);
}
}
if (num_seconds_trace_every) {
if (toIntegerSeconds(watch_trace.stop().diff()) >= num_seconds_trace_every) {
watch_trace.startFromStop();
logr.debug("txn tracing {}", toCharsRef(keystr));
auto err = Error{};
err = tx.setOptionNothrow(FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER, keystr);
if (err) {
logr.error("setOption(TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER): {}", err.what());
}
err = tx.setOptionNothrow(FDB_TR_OPTION_LOG_TRANSACTION, BytesRef());
if (err) {
logr.error("setOption(TR_OPTION_LOG_TRANSACTION): {}", err.what());
}
}
}
/* insert (SET) */
tx.set(keystr, valstr);
stats.incrOpCount(OP_INSERT);
/* commit every 100 inserts (default) or if this is the last key */
if ((i % num_commit_every == 0) || i == key_end) {
const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0;
auto watch_commit = Stopwatch(StartAtCtor{});
auto future_commit = tx.commit();
const auto rc = waitAndHandleError(tx, future_commit, "COMMIT_POPULATE_INSERT");
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
auto tx_restarter = ExitGuard([&watch_tx]() { watch_tx.startFromStop(); });
if (rc == FutureRC::OK) {
key_checkpoint = i + 1; // restart on failures from next key
tx = createNewTransaction(db, args, t_id, args.active_tenants > 0 ? tenants : nullptr);
} else if (rc == FutureRC::ABORT) {
return -1;
} else {
i = key_checkpoint - 1; // restart from last committed
continue;
}
/* xact latency stats */
if (do_sample) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
xacts++; /* for throttling */
}
}
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
key_end - key_begin + 1,
key_begin,
key_end,
toDoubleSeconds(watch_total.stop().diff()));
}
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
key_end - key_begin + 1,
key_begin,
key_end,
toDoubleSeconds(watch_total.stop().diff()));
return 0;
}
@ -2107,6 +2118,10 @@ int main(int argc, char* argv[]) {
/* usage printed */
return 0;
}
if (args.active_tenants > 1) {
args.rows = args.rows / args.active_tenants;
args.row_digits = digits(args.rows);
}
rc = validateArguments(args);
if (rc < 0)

View File

@ -22,7 +22,6 @@
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/SignalSafeUnwind.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Stress test the slow task profiler or flow profiler
@ -43,24 +42,22 @@ struct SlowTaskWorkload : TestWorkload {
ACTOR static Future<Void> go() {
wait(delay(1));
int64_t phc = dl_iterate_phdr_calls;
int64_t startProfilesDeferred = getNumProfilesDeferred();
int64_t startProfilesOverflowed = getNumProfilesOverflowed();
int64_t startProfilesCaptured = getNumProfilesCaptured();
int64_t exc = 0;
fprintf(stderr, "Slow task starting\n");
fprintf(stdout, "Slow task starting\n");
for (int i = 0; i < 10; i++) {
fprintf(stderr, " %d\n", i);
fprintf(stdout, " %d\n", i);
double end = timer() + 1;
while (timer() < end) {
do_slow_exception_thing(&exc);
}
}
fmt::print(stderr,
"Slow task complete: {0} exceptions; {1} calls to dl_iterate_phdr, {2}"
" profiles deferred, {3} profiles overflowed, {4} profiles captured\n",
fmt::print(stdout,
"Slow task complete: {0} exceptions; {1} profiles deferred, {2} profiles overflowed, {3} profiles "
"captured\n",
exc,
dl_iterate_phdr_calls - phc,
getNumProfilesDeferred() - startProfilesDeferred,
getNumProfilesOverflowed() - startProfilesOverflowed,
getNumProfilesCaptured() - startProfilesCaptured);

View File

@ -62,8 +62,6 @@ set(FLOW_SRCS
Profiler.h
ScopeExit.h
SendBufferIterator.h
SignalSafeUnwind.cpp
SignalSafeUnwind.h
SimpleOpt.h
StreamCipher.cpp
StreamCipher.h

View File

@ -89,10 +89,6 @@ void initProfiling() {
net2backtraces = new volatile void*[net2backtraces_max];
other_backtraces = new volatile void*[net2backtraces_max];
// According to folk wisdom, calling this once before setting up the signal handler makes
// it async signal safe in practice :-/
backtrace(const_cast<void**>(other_backtraces), net2backtraces_max);
sigemptyset(&sigprof_set);
sigaddset(&sigprof_set, SIGPROF);
}

View File

@ -3738,7 +3738,7 @@ void profileHandler(int sig) {
ps->timestamp = checkThreadTime.is_lock_free() ? checkThreadTime.load() : 0;
// SOMEDAY: should we limit the maximum number of frames from backtrace beyond just available space?
size_t size = backtrace(ps->frames, net2backtraces_max - net2backtraces_offset - 2);
size_t size = platform::raw_backtrace(ps->frames, net2backtraces_max - net2backtraces_offset - 2);
ps->length = size;

View File

@ -1,55 +0,0 @@
/*
* SignalSafeUnwind.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/SignalSafeUnwind.h"
int64_t dl_iterate_phdr_calls = 0;
#if defined(__linux__) && !defined(USE_SANITIZER)
#include <link.h>
#include <mutex>
static int (*chain_dl_iterate_phdr)(int (*callback)(struct dl_phdr_info* info, size_t size, void* data),
void* data) = nullptr;
static void initChain() {
static std::once_flag flag;
// Ensure that chain_dl_iterate_phdr points to the "real" function that we are overriding
std::call_once(flag, []() { *(void**)&chain_dl_iterate_phdr = dlsym(RTLD_NEXT, "dl_iterate_phdr"); });
if (!chain_dl_iterate_phdr) {
criticalError(FDB_EXIT_ERROR, "SignalSafeUnwindError", "Unable to find dl_iterate_phdr symbol");
}
}
// This overrides the function in libc!
extern "C" int dl_iterate_phdr(int (*callback)(struct dl_phdr_info* info, size_t size, void* data), void* data) {
interlockedIncrement64(&dl_iterate_phdr_calls);
initChain();
setProfilingEnabled(0);
int result = chain_dl_iterate_phdr(callback, data);
setProfilingEnabled(1);
return result;
}
#endif

View File

@ -1,30 +0,0 @@
/*
* SignalSafeUnwind.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_SIGNAL_SAFE_UNWIND
#define FLOW_SIGNAL_SAFE_UNWIND
#pragma once
#include "flow/Platform.h"
// This can be used by tests to measure the number of calls to dl_iterate_phdr intercepted
extern int64_t dl_iterate_phdr_calls;
#endif

View File

@ -94,7 +94,7 @@ if(WITH_PYTHON)
else()
add_fdb_test(TEST_FILES SimpleExternalTest.txt IGNORE)
endif()
add_fdb_test(TEST_FILES SlowTask.txt IGNORE)
add_fdb_test(TEST_FILES noSim/SlowTask.txt IGNORE)
add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)
add_fdb_test(TEST_FILES StorageMetricsSampleTests.txt IGNORE)
add_fdb_test(TEST_FILES WorkerTests.txt IGNORE)

View File

@ -1,2 +1,3 @@
useDB=false
testTitle=Slow Task Stress
testName=SlowTaskWorkload