Merge branch 'main' of https://github.com/apple/foundationdb into fix/main/grvError
This commit is contained in:
commit
e836b743c0
|
@ -484,7 +484,7 @@ elseif(NOT WIN32 AND NOT APPLE AND NOT USE_SANITIZER) # Linux Only, non-santizer
|
|||
--outdir ${SHIM_LIB_OUTPUT_DIR}
|
||||
--dlopen-callback=fdb_shim_dlopen_callback
|
||||
$<TARGET_FILE:fdb_c>
|
||||
DEPENDS ${IMPLIBSO_SRC}
|
||||
DEPENDS ${IMPLIBSO_SRC} fdb_c
|
||||
COMMENT "Generating source code for C shim library")
|
||||
|
||||
add_library(fdb_c_shim STATIC ${SHIM_LIB_GEN_SRC} foundationdb/fdb_c_shim.h fdb_c_shim.cpp)
|
||||
|
|
|
@ -81,6 +81,8 @@ public:
|
|||
: fdbTx(tx), txActor(txActor), contAfterDone(cont), scheduler(scheduler), retryLimit(retryLimit),
|
||||
txState(TxState::IN_PROGRESS), commitCalled(false), bgBasePath(bgBasePath) {}
|
||||
|
||||
virtual ~TransactionContextBase() { ASSERT(txState == TxState::DONE); }
|
||||
|
||||
// A state machine:
|
||||
// IN_PROGRESS -> (ON_ERROR -> IN_PROGRESS)* [-> ON_ERROR] -> DONE
|
||||
enum class TxState { IN_PROGRESS, ON_ERROR, DONE };
|
||||
|
@ -114,6 +116,10 @@ public:
|
|||
}
|
||||
txState = TxState::DONE;
|
||||
lock.unlock();
|
||||
|
||||
// No need for lock from here on, because only one thread
|
||||
// can enter DONE state and handle it
|
||||
|
||||
if (retriedErrors.size() >= LARGE_NUMBER_OF_RETRIES) {
|
||||
fmt::print("Transaction succeeded after {} retries on errors: {}\n",
|
||||
retriedErrors.size(),
|
||||
|
@ -124,6 +130,7 @@ public:
|
|||
fdbTx.cancel();
|
||||
txActor->complete(fdb::Error::success());
|
||||
cleanUp();
|
||||
ASSERT(txState == TxState::DONE);
|
||||
contAfterDone();
|
||||
}
|
||||
|
||||
|
@ -150,6 +157,10 @@ protected:
|
|||
}
|
||||
txState = TxState::DONE;
|
||||
lock.unlock();
|
||||
|
||||
// No need for lock from here on, because only one thread
|
||||
// can enter DONE state and handle it
|
||||
|
||||
txActor->complete(err);
|
||||
cleanUp();
|
||||
contAfterDone();
|
||||
|
@ -164,6 +175,7 @@ protected:
|
|||
transactionFailed(err);
|
||||
} else {
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
ASSERT(txState == TxState::ON_ERROR);
|
||||
txState = TxState::IN_PROGRESS;
|
||||
commitCalled = false;
|
||||
lock.unlock();
|
||||
|
@ -197,43 +209,58 @@ protected:
|
|||
}
|
||||
|
||||
// FDB transaction
|
||||
// Provides a thread safe interface by itself (no need for mutex)
|
||||
fdb::Transaction fdbTx;
|
||||
|
||||
// Actor implementing the transaction worklflow
|
||||
// Set in constructor and reset on cleanup (no need for mutex)
|
||||
std::shared_ptr<ITransactionActor> txActor;
|
||||
|
||||
// Mutex protecting access to shared mutable state
|
||||
// Only the state that is accessible unter IN_PROGRESS state
|
||||
// must be protected by mutex
|
||||
std::mutex mutex;
|
||||
|
||||
// Continuation to be called after completion of the transaction
|
||||
TTaskFct contAfterDone;
|
||||
// Set in contructor, stays immutable
|
||||
const TTaskFct contAfterDone;
|
||||
|
||||
// Reference to the scheduler
|
||||
IScheduler* scheduler;
|
||||
// Set in contructor, stays immutable
|
||||
// Cannot be accessed in DONE state, workloads can be completed and the scheduler deleted
|
||||
IScheduler* const scheduler;
|
||||
|
||||
// Retry limit
|
||||
int retryLimit;
|
||||
// Set in contructor, stays immutable
|
||||
const int retryLimit;
|
||||
|
||||
// Transaction execution state
|
||||
// Must be accessed under mutex
|
||||
TxState txState;
|
||||
|
||||
// onError future used in ON_ERROR state
|
||||
// onError future
|
||||
// used only in ON_ERROR state (no need for mutex)
|
||||
fdb::Future onErrorFuture;
|
||||
|
||||
// The error code on which onError was called
|
||||
// used only in ON_ERROR state (no need for mutex)
|
||||
fdb::Error onErrorArg;
|
||||
|
||||
// The time point of calling onError
|
||||
// used only in ON_ERROR state (no need for mutex)
|
||||
TimePoint onErrorCallTimePoint;
|
||||
|
||||
// Transaction is committed or being committed
|
||||
// Must be accessed under mutex
|
||||
bool commitCalled;
|
||||
|
||||
// A history of errors on which the transaction was retried
|
||||
// used only in ON_ERROR and DONE states (no need for mutex)
|
||||
std::vector<fdb::Error> retriedErrors;
|
||||
|
||||
// blob granule base path
|
||||
std::string bgBasePath;
|
||||
// Set in contructor, stays immutable
|
||||
const std::string bgBasePath;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -383,7 +410,6 @@ protected:
|
|||
if (txState != TxState::IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
lock.unlock();
|
||||
fdb::Error err = f.error();
|
||||
auto waitTimeUs = timeElapsedInUs(cbInfo.startTime, endTime);
|
||||
if (waitTimeUs > LONG_WAIT_TIME_US) {
|
||||
|
@ -399,6 +425,10 @@ protected:
|
|||
scheduler->schedule(cbInfo.cont);
|
||||
return;
|
||||
}
|
||||
// We keep lock until here to prevent transitions from the IN_PROGRESS state
|
||||
// which could possibly lead to completion of the workload and destruction
|
||||
// of the scheduler
|
||||
lock.unlock();
|
||||
onError(err);
|
||||
}
|
||||
|
||||
|
@ -411,6 +441,9 @@ protected:
|
|||
txState = TxState::ON_ERROR;
|
||||
lock.unlock();
|
||||
|
||||
// No need to hold the lock from here on, because ON_ERROR state is handled sequentially, and
|
||||
// other callbacks are simply ignored while it stays in this state
|
||||
|
||||
if (!canRetry(err)) {
|
||||
return;
|
||||
}
|
||||
|
@ -490,9 +523,12 @@ protected:
|
|||
};
|
||||
|
||||
// Map for keeping track of future waits and holding necessary object references
|
||||
// It can be accessed at any time when callbacks are triggered, so it mus always
|
||||
// be mutex protected
|
||||
std::unordered_map<fdb::Future, CallbackInfo> callbackMap;
|
||||
|
||||
// Holding reference to this for onError future C callback
|
||||
// Accessed only in ON_ERROR state (no need for mutex)
|
||||
std::shared_ptr<AsyncTransactionContext> onErrorThisRef;
|
||||
};
|
||||
|
||||
|
|
|
@ -80,13 +80,14 @@ bool WorkloadConfig::getBoolOption(const std::string& name, bool defaultVal) con
|
|||
|
||||
WorkloadBase::WorkloadBase(const WorkloadConfig& config)
|
||||
: manager(nullptr), tasksScheduled(0), numErrors(0), clientId(config.clientId), numClients(config.numClients),
|
||||
failed(false), numTxCompleted(0) {
|
||||
failed(false), numTxCompleted(0), numTxStarted(0), inProgress(false) {
|
||||
maxErrors = config.getIntOption("maxErrors", 10);
|
||||
workloadId = fmt::format("{}{}", config.name, clientId);
|
||||
}
|
||||
|
||||
void WorkloadBase::init(WorkloadManager* manager) {
|
||||
this->manager = manager;
|
||||
inProgress = true;
|
||||
}
|
||||
|
||||
void WorkloadBase::printStats() {
|
||||
|
@ -94,6 +95,7 @@ void WorkloadBase::printStats() {
|
|||
}
|
||||
|
||||
void WorkloadBase::schedule(TTaskFct task) {
|
||||
ASSERT(inProgress);
|
||||
if (failed) {
|
||||
return;
|
||||
}
|
||||
|
@ -105,10 +107,12 @@ void WorkloadBase::schedule(TTaskFct task) {
|
|||
}
|
||||
|
||||
void WorkloadBase::execTransaction(std::shared_ptr<ITransactionActor> tx, TTaskFct cont, bool failOnError) {
|
||||
ASSERT(inProgress);
|
||||
if (failed) {
|
||||
return;
|
||||
}
|
||||
tasksScheduled++;
|
||||
numTxStarted++;
|
||||
manager->txExecutor->execute(tx, [this, tx, cont, failOnError]() {
|
||||
numTxCompleted++;
|
||||
fdb::Error err = tx->getError();
|
||||
|
@ -143,11 +147,13 @@ void WorkloadBase::error(const std::string& msg) {
|
|||
|
||||
void WorkloadBase::scheduledTaskDone() {
|
||||
if (--tasksScheduled == 0) {
|
||||
inProgress = false;
|
||||
if (numErrors > 0) {
|
||||
error(fmt::format("Workload failed with {} errors", numErrors.load()));
|
||||
} else {
|
||||
info("Workload successfully completed");
|
||||
}
|
||||
ASSERT(numTxStarted == numTxCompleted);
|
||||
manager->workloadDone(this, numErrors > 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,6 +164,12 @@ protected:
|
|||
|
||||
// Number of completed transactions
|
||||
std::atomic<int> numTxCompleted;
|
||||
|
||||
// Number of started transactions
|
||||
std::atomic<int> numTxStarted;
|
||||
|
||||
// Workload is in progress (intialized, but not completed)
|
||||
std::atomic<bool> inProgress;
|
||||
};
|
||||
|
||||
// Workload manager
|
||||
|
|
|
@ -666,6 +666,7 @@ public:
|
|||
static void createTenant(Transaction tr, BytesRef name) {
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, BytesRef());
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE, BytesRef());
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_RAW_ACCESS, BytesRef());
|
||||
tr.set(toBytesRef(fmt::format("{}{}", tenantManagementMapPrefix, toCharsRef(name))), BytesRef());
|
||||
}
|
||||
|
||||
|
|
|
@ -102,6 +102,11 @@ func (o NetworkOptions) SetTraceFileIdentifier(param string) error {
|
|||
return o.setOpt(36, []byte(param))
|
||||
}
|
||||
|
||||
// Use the same base trace file name for all client threads as it did before version 7.2. The current default behavior is to use distinct trace file names for client threads by including their version and thread index.
|
||||
func (o NetworkOptions) SetTraceShareAmongClientThreads() error {
|
||||
return o.setOpt(37, nil)
|
||||
}
|
||||
|
||||
// Set file suffix for partially written log files.
|
||||
//
|
||||
// Parameter: Append this suffix to partially written log files. When a log file is complete, it is renamed to remove the suffix. No separator is added between the file and the suffix. If you want to add a file extension, you should include the separator - e.g. '.tmp' instead of 'tmp' to add the 'tmp' extension.
|
||||
|
|
|
@ -7,6 +7,7 @@ import subprocess
|
|||
import logging
|
||||
import functools
|
||||
import json
|
||||
import tempfile
|
||||
import time
|
||||
import random
|
||||
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
||||
|
@ -770,6 +771,68 @@ def integer_options():
|
|||
assert lines[1].startswith('Committed')
|
||||
assert error_output == b''
|
||||
|
||||
def tls_address_suffix():
|
||||
# fdbcli shall prevent a non-TLS fdbcli run from connecting to an all-TLS cluster, and vice versa
|
||||
preamble = 'eNW1yf1M:eNW1yf1M@'
|
||||
def make_addr(port: int, tls: bool = False):
|
||||
return "127.0.0.1:{}{}".format(port, ":tls" if tls else "")
|
||||
testcases = [
|
||||
# IsServerTLS, NumServerAddrs
|
||||
(True, 1),
|
||||
(False, 1),
|
||||
(True, 3),
|
||||
(False, 3),
|
||||
]
|
||||
err_output_server_no_tls = "ERROR: fdbcli is configured with TLS, but none of the coordinators have TLS addresses."
|
||||
err_output_server_tls = "ERROR: fdbcli is not configured with TLS, but all of the coordinators have TLS addresses."
|
||||
|
||||
# technically the contents of the certs and key files are not evaluated
|
||||
# before tls-suffix check against tls configuration takes place,
|
||||
# but we generate the certs and keys anyway to avoid
|
||||
# imposing nuanced TLSConfig evaluation ordering requirement on the testcase
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
cert_file = tmpdir + "/client-cert.pem"
|
||||
key_file = tmpdir + "/client-key.pem"
|
||||
ca_file = tmpdir + "/server-ca.pem"
|
||||
mkcert_process = subprocess.run([
|
||||
args.build_dir + "/bin/mkcert",
|
||||
"--server-chain-length", "1",
|
||||
"--client-chain-length", "1",
|
||||
"--server-cert-file", tmpdir + "/server-cert.pem",
|
||||
"--client-cert-file", tmpdir + "/client-cert.pem",
|
||||
"--server-key-file", tmpdir + "/server-key.pem",
|
||||
"--client-key-file", tmpdir + "/client-key.pem",
|
||||
"--server-ca-file", tmpdir + "/server-ca.pem",
|
||||
"--client-ca-file", tmpdir + "/client-ca.pem",
|
||||
],
|
||||
capture_output=True)
|
||||
if mkcert_process.returncode != 0:
|
||||
print("mkcert returned with code {}".format(mkcert_process.returncode))
|
||||
print("Output:\n{}{}\n".format(
|
||||
mkcert_process.stdout.decode("utf8").strip(),
|
||||
mkcert_process.stderr.decode("utf8").strip()))
|
||||
assert False
|
||||
cluster_fn = tmpdir + "/fdb.cluster"
|
||||
for testcase in testcases:
|
||||
is_server_tls, num_server_addrs = testcase
|
||||
with open(cluster_fn, "w") as fp:
|
||||
fp.write(preamble + ",".join(
|
||||
[make_addr(port=4000 + addr_idx, tls=is_server_tls) for addr_idx in range(num_server_addrs)]))
|
||||
fp.close()
|
||||
tls_args = ["--tls-certificate-file",
|
||||
cert_file,
|
||||
"--tls-key-file",
|
||||
key_file,
|
||||
"--tls-ca-file",
|
||||
ca_file] if not is_server_tls else []
|
||||
fdbcli_process = subprocess.run(command_template[:2] + [cluster_fn] + tls_args, capture_output=True)
|
||||
assert fdbcli_process.returncode != 0
|
||||
err_out = fdbcli_process.stderr.decode("utf8").strip()
|
||||
if is_server_tls:
|
||||
assert err_out == err_output_server_tls, f"unexpected output: {err_out}"
|
||||
else:
|
||||
assert err_out == err_output_server_no_tls, f"unexpected output: {err_out}"
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
|
||||
description="""
|
||||
|
@ -816,6 +879,7 @@ if __name__ == '__main__':
|
|||
tenants()
|
||||
versionepoch()
|
||||
integer_options()
|
||||
tls_address_suffix()
|
||||
else:
|
||||
assert args.process_number > 1, "Process number should be positive"
|
||||
coordinators()
|
||||
|
|
|
@ -127,19 +127,24 @@ ACTOR Future<bool> blobRangeCommandActor(Database localDb,
|
|||
}
|
||||
fmt::print("{0} blobbify range for [{1} - {2})\n",
|
||||
starting ? "Starting" : "Stopping",
|
||||
tokens[2].printable().c_str(),
|
||||
tokens[3].printable().c_str());
|
||||
tokens[2].printable(),
|
||||
tokens[3].printable());
|
||||
state bool success = false;
|
||||
if (starting) {
|
||||
wait(store(success, localDb->blobbifyRange(KeyRangeRef(begin, end))));
|
||||
} else {
|
||||
wait(store(success, localDb->unblobbifyRange(KeyRangeRef(begin, end))));
|
||||
}
|
||||
if (!success) {
|
||||
if (success) {
|
||||
fmt::print("{0} updated blob range [{1} - {2}) succeeded\n",
|
||||
starting ? "Starting" : "Stopping",
|
||||
tokens[2].printable(),
|
||||
tokens[3].printable());
|
||||
} else {
|
||||
fmt::print("{0} blobbify range for [{1} - {2}) failed\n",
|
||||
starting ? "Starting" : "Stopping",
|
||||
tokens[2].printable().c_str(),
|
||||
tokens[3].printable().c_str());
|
||||
tokens[2].printable(),
|
||||
tokens[3].printable());
|
||||
}
|
||||
return success;
|
||||
} else if (tokencmp(tokens[1], "purge") || tokencmp(tokens[1], "forcepurge") || tokencmp(tokens[1], "check")) {
|
||||
|
|
|
@ -1050,7 +1050,7 @@ Future<T> stopNetworkAfter(Future<T> what) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
||||
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterConnectionFile> ccf) {
|
||||
state LineNoise& linenoise = *plinenoise;
|
||||
state bool intrans = false;
|
||||
|
||||
|
@ -1075,20 +1075,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
|
||||
state FdbOptions* options = &globalOptions;
|
||||
|
||||
state Reference<ClusterConnectionFile> ccf;
|
||||
|
||||
state std::pair<std::string, bool> resolvedClusterFile =
|
||||
ClusterConnectionFile::lookupClusterFileName(opt.clusterFile);
|
||||
try {
|
||||
ccf = makeReference<ClusterConnectionFile>(resolvedClusterFile.first);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
throw;
|
||||
}
|
||||
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Ordinarily, this is done when the network is run. However, network thread should be set before TraceEvents are
|
||||
// logged. This thread will eventually run the network, so call it now.
|
||||
TraceEvent::setNetworkThread();
|
||||
|
@ -1987,7 +1973,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<int> runCli(CLIOptions opt) {
|
||||
ACTOR Future<int> runCli(CLIOptions opt, Reference<ClusterConnectionFile> ccf) {
|
||||
state LineNoise linenoise(
|
||||
[](std::string const& line, std::vector<std::string>& completions) { fdbcliCompCmd(line, completions); },
|
||||
[enabled = opt.cliHints](std::string const& line) -> LineNoise::Hint {
|
||||
|
@ -2051,7 +2037,7 @@ ACTOR Future<int> runCli(CLIOptions opt) {
|
|||
.GetLastError();
|
||||
}
|
||||
|
||||
state int result = wait(cli(opt, &linenoise));
|
||||
state int result = wait(cli(opt, &linenoise, ccf));
|
||||
|
||||
if (!historyFilename.empty()) {
|
||||
try {
|
||||
|
@ -2073,6 +2059,33 @@ ACTOR Future<Void> timeExit(double duration) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
const char* checkTlsConfigAgainstCoordAddrs(const ClusterConnectionString& ccs) {
|
||||
// Resolve TLS config and inspect whether any of the certificate, key, ca bytes has been set
|
||||
extern TLSConfig tlsConfig;
|
||||
auto const loaded = tlsConfig.loadSync();
|
||||
const bool tlsConfigured =
|
||||
!loaded.getCertificateBytes().empty() || !loaded.getKeyBytes().empty() || !loaded.getCABytes().empty();
|
||||
int tlsAddrs = 0;
|
||||
int totalAddrs = 0;
|
||||
for (const auto& addr : ccs.coords) {
|
||||
if (addr.isTLS())
|
||||
tlsAddrs++;
|
||||
totalAddrs++;
|
||||
}
|
||||
for (const auto& host : ccs.hostnames) {
|
||||
if (host.isTLS)
|
||||
tlsAddrs++;
|
||||
totalAddrs++;
|
||||
}
|
||||
if (tlsConfigured && tlsAddrs == 0) {
|
||||
return "fdbcli is configured with TLS, but none of the coordinators have TLS addresses.";
|
||||
} else if (!tlsConfigured && tlsAddrs == totalAddrs) {
|
||||
return "fdbcli is not configured with TLS, but all of the coordinators have TLS addresses.";
|
||||
} else {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
platformInit();
|
||||
Error::init();
|
||||
|
@ -2177,6 +2190,25 @@ int main(int argc, char** argv) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
Reference<ClusterConnectionFile> ccf;
|
||||
std::pair<std::string, bool> resolvedClusterFile = ClusterConnectionFile::lookupClusterFileName(opt.clusterFile);
|
||||
|
||||
try {
|
||||
ccf = makeReference<ClusterConnectionFile>(resolvedClusterFile.first);
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_operation_cancelled) {
|
||||
throw;
|
||||
}
|
||||
fprintf(stderr, "%s\n", ClusterConnectionFile::getErrorString(resolvedClusterFile, e).c_str());
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Make sure that TLS configuration lines up with ":tls" prefix on coordinator addresses
|
||||
if (auto errorMsg = checkTlsConfigAgainstCoordAddrs(ccf->getConnectionString())) {
|
||||
fprintf(stderr, "ERROR: %s\n", errorMsg);
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
API->selectApiVersion(opt.apiVersion);
|
||||
if (opt.useFutureProtocolVersion) {
|
||||
|
@ -2188,7 +2220,7 @@ int main(int argc, char** argv) {
|
|||
return opt.exit_code;
|
||||
}
|
||||
Future<Void> memoryUsageMonitor = startMemoryUsageMonitor(opt.memLimit);
|
||||
Future<int> cliFuture = runCli(opt);
|
||||
Future<int> cliFuture = runCli(opt, ccf);
|
||||
Future<Void> timeoutFuture = opt.exit_timeout ? timeExit(opt.exit_timeout) : Never();
|
||||
auto f = stopNetworkAfter(success(cliFuture) || timeoutFuture);
|
||||
API->runNetwork();
|
||||
|
|
|
@ -127,7 +127,10 @@ ACTOR Future<RangeResult> krmGetRangesUnaligned(Transaction* tr,
|
|||
|
||||
state GetRangeLimits limits(limit, limitBytes);
|
||||
limits.minRows = 2;
|
||||
RangeResult kv = wait(tr->getRange(lastLessOrEqual(withPrefix.begin), firstGreaterThan(withPrefix.end), limits));
|
||||
// wait to include the next highest row >= keys.end in the result, so since end is exclusive, we need +2 and
|
||||
// !orEqual
|
||||
RangeResult kv =
|
||||
wait(tr->getRange(lastLessOrEqual(withPrefix.begin), KeySelectorRef(withPrefix.end, false, +2), limits));
|
||||
|
||||
return krmDecodeRanges(mapPrefix, keys, kv, false);
|
||||
}
|
||||
|
@ -142,7 +145,10 @@ ACTOR Future<RangeResult> krmGetRangesUnaligned(Reference<ReadYourWritesTransact
|
|||
|
||||
state GetRangeLimits limits(limit, limitBytes);
|
||||
limits.minRows = 2;
|
||||
RangeResult kv = wait(tr->getRange(lastLessOrEqual(withPrefix.begin), firstGreaterThan(withPrefix.end), limits));
|
||||
// wait to include the next highest row >= keys.end in the result, so since end is exclusive, we need +2 and
|
||||
// !orEqual
|
||||
RangeResult kv =
|
||||
wait(tr->getRange(lastLessOrEqual(withPrefix.begin), KeySelectorRef(withPrefix.end, false, +2), limits));
|
||||
|
||||
return krmDecodeRanges(mapPrefix, keys, kv, false);
|
||||
}
|
||||
|
@ -323,17 +329,27 @@ TEST_CASE("/keyrangemap/decoderange/aligned") {
|
|||
StringRef keyD = StringRef(arena, LiteralStringRef("d"));
|
||||
StringRef keyE = StringRef(arena, LiteralStringRef("e"));
|
||||
StringRef keyAB = StringRef(arena, LiteralStringRef("ab"));
|
||||
StringRef keyAC = StringRef(arena, LiteralStringRef("ac"));
|
||||
StringRef keyCD = StringRef(arena, LiteralStringRef("cd"));
|
||||
|
||||
// Fake getRange() call.
|
||||
RangeResult kv;
|
||||
kv.push_back(arena, KeyValueRef(fullKeyA, keyA));
|
||||
kv.push_back(arena, KeyValueRef(fullKeyB, keyB));
|
||||
|
||||
// [A, AB(start), AC(start), B]
|
||||
RangeResult decodedRanges = krmDecodeRanges(prefix, KeyRangeRef(keyAB, keyAC), kv);
|
||||
ASSERT(decodedRanges.size() == 2);
|
||||
ASSERT(decodedRanges.front().key == keyAB);
|
||||
ASSERT(decodedRanges.front().value == keyA);
|
||||
ASSERT(decodedRanges.back().key == keyAC);
|
||||
ASSERT(decodedRanges.back().value == keyA);
|
||||
|
||||
kv.push_back(arena, KeyValueRef(fullKeyC, keyC));
|
||||
kv.push_back(arena, KeyValueRef(fullKeyD, keyD));
|
||||
|
||||
// [A, AB(start), B, C, CD(end), D]
|
||||
RangeResult decodedRanges = krmDecodeRanges(prefix, KeyRangeRef(keyAB, keyCD), kv);
|
||||
decodedRanges = krmDecodeRanges(prefix, KeyRangeRef(keyAB, keyCD), kv);
|
||||
ASSERT(decodedRanges.size() == 4);
|
||||
ASSERT(decodedRanges.front().key == keyAB);
|
||||
ASSERT(decodedRanges.front().value == keyA);
|
||||
|
@ -365,17 +381,27 @@ TEST_CASE("/keyrangemap/decoderange/unaligned") {
|
|||
StringRef keyD = StringRef(arena, LiteralStringRef("d"));
|
||||
StringRef keyE = StringRef(arena, LiteralStringRef("e"));
|
||||
StringRef keyAB = StringRef(arena, LiteralStringRef("ab"));
|
||||
StringRef keyAC = StringRef(arena, LiteralStringRef("ac"));
|
||||
StringRef keyCD = StringRef(arena, LiteralStringRef("cd"));
|
||||
|
||||
// Fake getRange() call.
|
||||
RangeResult kv;
|
||||
kv.push_back(arena, KeyValueRef(fullKeyA, keyA));
|
||||
kv.push_back(arena, KeyValueRef(fullKeyB, keyB));
|
||||
|
||||
// [A, AB(start), AC(start), B]
|
||||
RangeResult decodedRanges = krmDecodeRanges(prefix, KeyRangeRef(keyAB, keyAC), kv, false);
|
||||
ASSERT(decodedRanges.size() == 2);
|
||||
ASSERT(decodedRanges.front().key == keyA);
|
||||
ASSERT(decodedRanges.front().value == keyA);
|
||||
ASSERT(decodedRanges.back().key == keyB);
|
||||
ASSERT(decodedRanges.back().value == keyB);
|
||||
|
||||
kv.push_back(arena, KeyValueRef(fullKeyC, keyC));
|
||||
kv.push_back(arena, KeyValueRef(fullKeyD, keyD));
|
||||
|
||||
// [A, AB(start), B, C, CD(end), D]
|
||||
RangeResult decodedRanges = krmDecodeRanges(prefix, KeyRangeRef(keyAB, keyCD), kv, false);
|
||||
decodedRanges = krmDecodeRanges(prefix, KeyRangeRef(keyAB, keyCD), kv, false);
|
||||
ASSERT(decodedRanges.size() == 4);
|
||||
ASSERT(decodedRanges.front().key == keyA);
|
||||
ASSERT(decodedRanges.front().value == keyA);
|
||||
|
|
|
@ -2471,6 +2471,18 @@ void MultiVersionApi::setNetworkOptionInternal(FDBNetworkOptions::Option option,
|
|||
} else if (option == FDBNetworkOptions::FUTURE_VERSION_CLIENT_LIBRARY) {
|
||||
validateOption(value, true, false, false);
|
||||
addExternalLibrary(abspath(value.get().toString()), true);
|
||||
} else if (option == FDBNetworkOptions::TRACE_FILE_IDENTIFIER) {
|
||||
validateOption(value, true, false, true);
|
||||
traceFileIdentifier = value.get().toString();
|
||||
{
|
||||
MutexHolder holder(lock);
|
||||
// Forward the option unmodified only to the the local client and let it validate it.
|
||||
// While for external clients the trace file identifiers are determined in setupNetwork
|
||||
localClient->api->setNetworkOption(option, value);
|
||||
}
|
||||
} else if (option == FDBNetworkOptions::TRACE_SHARE_AMONG_CLIENT_THREADS) {
|
||||
validateOption(value, false, true);
|
||||
traceShareBaseNameAmongThreads = true;
|
||||
} else {
|
||||
forwardOption = true;
|
||||
}
|
||||
|
@ -2514,9 +2526,13 @@ void MultiVersionApi::setupNetwork() {
|
|||
// Copy external lib for each thread
|
||||
if (externalClients.count(filename) == 0) {
|
||||
externalClients[filename] = {};
|
||||
for (const auto& tmp : copyExternalLibraryPerThread(path)) {
|
||||
auto libCopies = copyExternalLibraryPerThread(path);
|
||||
for (int idx = 0; idx < libCopies.size(); ++idx) {
|
||||
externalClients[filename].push_back(Reference<ClientInfo>(
|
||||
new ClientInfo(new DLApi(tmp.first, tmp.second /*unlink on load*/), path, useFutureVersion)));
|
||||
new ClientInfo(new DLApi(libCopies[idx].first, libCopies[idx].second /*unlink on load*/),
|
||||
path,
|
||||
useFutureVersion,
|
||||
idx)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2562,13 +2578,23 @@ void MultiVersionApi::setupNetwork() {
|
|||
client->loadVersion();
|
||||
});
|
||||
|
||||
std::string baseTraceFileId;
|
||||
if (apiVersion.hasTraceFileIdentifier()) {
|
||||
// TRACE_FILE_IDENTIFIER option is supported since 6.3
|
||||
baseTraceFileId = traceFileIdentifier.empty() ? format("%d", getpid()) : traceFileIdentifier;
|
||||
}
|
||||
|
||||
MutexHolder holder(lock);
|
||||
runOnExternalClientsAllThreads([this, transportId](Reference<ClientInfo> client) {
|
||||
runOnExternalClientsAllThreads([this, transportId, baseTraceFileId](Reference<ClientInfo> client) {
|
||||
for (auto option : options) {
|
||||
client->api->setNetworkOption(option.first, option.second.castTo<StringRef>());
|
||||
}
|
||||
client->api->setNetworkOption(FDBNetworkOptions::EXTERNAL_CLIENT_TRANSPORT_ID, std::to_string(transportId));
|
||||
|
||||
if (!baseTraceFileId.empty()) {
|
||||
client->api->setNetworkOption(
|
||||
FDBNetworkOptions::TRACE_FILE_IDENTIFIER,
|
||||
traceShareBaseNameAmongThreads ? baseTraceFileId : client->getTraceFileIdentifier(baseTraceFileId));
|
||||
}
|
||||
client->api->setupNetwork();
|
||||
});
|
||||
|
||||
|
@ -2607,21 +2633,17 @@ void MultiVersionApi::runNetwork() {
|
|||
|
||||
std::vector<THREAD_HANDLE> handles;
|
||||
if (!bypassMultiClientApi) {
|
||||
for (int threadNum = 0; threadNum < threadCount; threadNum++) {
|
||||
runOnExternalClients(threadNum, [&handles, threadNum](Reference<ClientInfo> client) {
|
||||
if (client->external) {
|
||||
std::string threadName = format("fdb-%s-%d", client->releaseVersion.c_str(), threadNum);
|
||||
if (threadName.size() > 15) {
|
||||
threadName = format("fdb-%s", client->releaseVersion.c_str());
|
||||
if (threadName.size() > 15) {
|
||||
threadName = "fdb-external";
|
||||
}
|
||||
}
|
||||
handles.push_back(
|
||||
g_network->startThread(&runNetworkThread, client.getPtr(), 0, threadName.c_str()));
|
||||
runOnExternalClientsAllThreads([&handles](Reference<ClientInfo> client) {
|
||||
ASSERT(client->external);
|
||||
std::string threadName = format("fdb-%s-%d", client->releaseVersion.c_str(), client->threadIndex);
|
||||
if (threadName.size() > 15) {
|
||||
threadName = format("fdb-%s", client->releaseVersion.c_str());
|
||||
if (threadName.size() > 15) {
|
||||
threadName = "fdb-external";
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
handles.push_back(g_network->startThread(&runNetworkThread, client.getPtr(), 0, threadName.c_str()));
|
||||
});
|
||||
}
|
||||
|
||||
localClient->api->runNetwork();
|
||||
|
@ -2911,7 +2933,7 @@ void MultiVersionApi::loadEnvironmentVariableNetworkOptions() {
|
|||
MultiVersionApi::MultiVersionApi()
|
||||
: callbackOnMainThread(true), localClientDisabled(false), networkStartSetup(false), networkSetup(false),
|
||||
bypassMultiClientApi(false), externalClient(false), apiVersion(0), threadCount(0), tmpDir("/tmp"),
|
||||
envOptionsLoaded(false) {}
|
||||
traceShareBaseNameAmongThreads(false), envOptionsLoaded(false) {}
|
||||
|
||||
MultiVersionApi* MultiVersionApi::api = new MultiVersionApi();
|
||||
|
||||
|
@ -2948,6 +2970,12 @@ bool ClientInfo::canReplace(Reference<ClientInfo> other) const {
|
|||
return !protocolVersion.isCompatible(other->protocolVersion);
|
||||
}
|
||||
|
||||
std::string ClientInfo::getTraceFileIdentifier(const std::string& baseIdentifier) {
|
||||
std::string versionStr = releaseVersion;
|
||||
std::replace(versionStr.begin(), versionStr.end(), '.', '_');
|
||||
return format("%s_v%st%d", baseIdentifier.c_str(), versionStr.c_str(), threadIndex);
|
||||
}
|
||||
|
||||
// UNIT TESTS
|
||||
TEST_CASE("/fdbclient/multiversionclient/EnvironmentVariableParsing") {
|
||||
auto vals = parseOptionValues("a");
|
||||
|
|
|
@ -1466,13 +1466,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
|
||||
lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
|
||||
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
|
||||
coordinator(coordinator), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
|
||||
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
|
||||
detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
|
||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
|
||||
|
||||
apiVersion = ApiVersion(_apiVersion);
|
||||
|
||||
dbId = deterministicRandom()->randomUniqueID();
|
||||
|
||||
TraceEvent("DatabaseContextCreated", dbId).backtrace();
|
||||
|
@ -9988,31 +9986,32 @@ ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange ran
|
|||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(db);
|
||||
|
||||
state Value value = active ? blobRangeActive : blobRangeInactive;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> startBlobRanges = wait(getBlobRanges(tr, range, 10));
|
||||
state Standalone<VectorRef<KeyRangeRef>> endBlobRanges =
|
||||
wait(getBlobRanges(tr, KeyRangeRef(range.end, keyAfter(range.end)), 10));
|
||||
Standalone<VectorRef<KeyRangeRef>> startBlobRanges = wait(getBlobRanges(tr, range, 1));
|
||||
|
||||
if (active) {
|
||||
// Idempotent request.
|
||||
if (!startBlobRanges.empty() && !endBlobRanges.empty()) {
|
||||
return startBlobRanges.front().begin == range.begin && endBlobRanges.front().end == range.end;
|
||||
if (!startBlobRanges.empty()) {
|
||||
return startBlobRanges.front().begin == range.begin && startBlobRanges.front().end == range.end;
|
||||
}
|
||||
} else {
|
||||
// An unblobbify request must be aligned to boundaries.
|
||||
// It is okay to unblobbify multiple regions all at once.
|
||||
if (startBlobRanges.empty() && endBlobRanges.empty()) {
|
||||
if (startBlobRanges.empty()) {
|
||||
// already unblobbified
|
||||
return true;
|
||||
} else if (startBlobRanges.front().begin != range.begin) {
|
||||
// If there is a blob at the beginning of the range and it isn't aligned
|
||||
return false;
|
||||
}
|
||||
// If there is a blob at the beginning of the range and it isn't aligned,
|
||||
// or there is a blob range that begins before the end of the range, then fail.
|
||||
if ((!startBlobRanges.empty() && startBlobRanges.front().begin != range.begin) ||
|
||||
(!endBlobRanges.empty() && endBlobRanges.front().begin < range.end)) {
|
||||
// if blob range does start at the specified, key, we need to make sure the end of also a boundary of a
|
||||
// blob range
|
||||
Optional<Value> endPresent = wait(tr->get(range.end.withPrefix(blobRangeKeys.begin)));
|
||||
if (!endPresent.present()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -10021,10 +10020,6 @@ ACTOR Future<bool> setBlobRangeActor(Reference<DatabaseContext> cx, KeyRange ran
|
|||
// This is not coalescing because we want to keep each range logically separate.
|
||||
wait(krmSetRange(tr, blobRangeKeys.begin, range, value));
|
||||
wait(tr->commit());
|
||||
printf("Successfully updated blob range [%s - %s) to %s\n",
|
||||
range.begin.printable().c_str(),
|
||||
range.end.printable().c_str(),
|
||||
value.printable().c_str());
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
|
|
|
@ -776,17 +776,22 @@ struct ClientInfo : ClientDesc, ThreadSafeReferenceCounted<ClientInfo> {
|
|||
IClientApi* api;
|
||||
bool failed;
|
||||
std::atomic_bool initialized;
|
||||
int threadIndex;
|
||||
std::vector<std::pair<void (*)(void*), void*>> threadCompletionHooks;
|
||||
|
||||
ClientInfo()
|
||||
: ClientDesc(std::string(), false, false), protocolVersion(0), api(nullptr), failed(true), initialized(false) {}
|
||||
: ClientDesc(std::string(), false, false), protocolVersion(0), api(nullptr), failed(true), initialized(false),
|
||||
threadIndex(0) {}
|
||||
ClientInfo(IClientApi* api)
|
||||
: ClientDesc("internal", false, false), protocolVersion(0), api(api), failed(false), initialized(false) {}
|
||||
ClientInfo(IClientApi* api, std::string libPath, bool useFutureVersion)
|
||||
: ClientDesc(libPath, true, useFutureVersion), protocolVersion(0), api(api), failed(false), initialized(false) {}
|
||||
: ClientDesc("internal", false, false), protocolVersion(0), api(api), failed(false), initialized(false),
|
||||
threadIndex(0) {}
|
||||
ClientInfo(IClientApi* api, std::string libPath, bool useFutureVersion, int threadIndex)
|
||||
: ClientDesc(libPath, true, useFutureVersion), protocolVersion(0), api(api), failed(false), initialized(false),
|
||||
threadIndex(threadIndex) {}
|
||||
|
||||
void loadVersion();
|
||||
bool canReplace(Reference<ClientInfo> other) const;
|
||||
std::string getTraceFileIdentifier(const std::string& baseIdentifier);
|
||||
};
|
||||
|
||||
class MultiVersionApi;
|
||||
|
@ -1106,6 +1111,8 @@ private:
|
|||
int nextThread = 0;
|
||||
int threadCount;
|
||||
std::string tmpDir;
|
||||
bool traceShareBaseNameAmongThreads;
|
||||
std::string traceFileIdentifier;
|
||||
|
||||
Mutex lock;
|
||||
std::vector<std::pair<FDBNetworkOptions::Option, Optional<Standalone<StringRef>>>> options;
|
||||
|
|
|
@ -50,8 +50,7 @@ struct SpanContext {
|
|||
SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {}
|
||||
SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {}
|
||||
SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {}
|
||||
SpanContext(Arena arena, const SpanContext& span)
|
||||
: traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {}
|
||||
SpanContext(const SpanContext& span) = default;
|
||||
bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; }
|
||||
std::string toString() const { return format("%016llx%016llx%016llx", traceID.first(), traceID.second(), spanID); };
|
||||
bool isValid() const { return traceID.first() != 0 && traceID.second() != 0 && spanID != 0; }
|
||||
|
@ -62,6 +61,9 @@ struct SpanContext {
|
|||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct flow_ref<SpanContext> : std::false_type {};
|
||||
|
||||
// Span
|
||||
//
|
||||
// Span is a tracing implementation which, for the most part, complies with the W3C Trace Context specification
|
||||
|
@ -155,7 +157,7 @@ public:
|
|||
// We've determined for initial tracing release, spans with only a location will not be traced.
|
||||
// Generally these are for background processes, some are called infrequently, while others may be high volume.
|
||||
// TODO: review and address in subsequent PRs.
|
||||
Span(const Location& location) : location(location), begin(g_network->now()) {}
|
||||
explicit Span(const Location& location) : Span(location, SpanContext()) {}
|
||||
|
||||
Span(const Span&) = delete;
|
||||
Span(Span&& o) {
|
||||
|
|
|
@ -57,6 +57,8 @@ description is not currently required but encouraged.
|
|||
<Option name="trace_file_identifier" code="36"
|
||||
paramType="String" paramDescription="The identifier that will be part of all trace file names"
|
||||
description="Once provided, this string will be used to replace the port/PID in the log file names." />
|
||||
<Option name="trace_share_among_client_threads" code="37"
|
||||
description="Use the same base trace file name for all client threads as it did before version 7.2. The current default behavior is to use distinct trace file names for client threads by including their version and thread index." />
|
||||
<Option name="trace_partial_file_suffix" code="39"
|
||||
paramType="String" paramDescription="Append this suffix to partially written log files. When a log file is complete, it is renamed to remove the suffix. No separator is added between the file and the suffix. If you want to add a file extension, you should include the separator - e.g. '.tmp' instead of 'tmp' to add the 'tmp' extension."
|
||||
description="Set file suffix for partially written log files." />
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
if(NOT WIN32)
|
||||
add_flow_target(EXECUTABLE NAME authz_tls_unittest SRCS AuthzTlsTest.actor.cpp)
|
||||
target_link_libraries(authz_tls_unittest PRIVATE flow fdbrpc fmt::fmt)
|
||||
add_test(NAME authorization_tls_unittest
|
||||
COMMAND $<TARGET_FILE:authz_tls_unittest>)
|
||||
if(NOT OPEN_FOR_IDE)
|
||||
add_test(NAME authorization_tls_unittest
|
||||
COMMAND $<TARGET_FILE:authz_tls_unittest>)
|
||||
endif()
|
||||
endif()
|
||||
|
|
|
@ -18,13 +18,17 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbserver/DataDistribution.actor.h"
|
||||
#include "fdbserver/DDSharedContext.h"
|
||||
#include "fdbserver/TenantCache.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -121,6 +125,8 @@ struct DataDistributionTracker : public IDDShardTracker {
|
|||
}
|
||||
};
|
||||
|
||||
Optional<Reference<TenantCache>> ddTenantCache;
|
||||
|
||||
DataDistributionTracker(Database cx,
|
||||
UID distributorId,
|
||||
Promise<Void> const& readyToStart,
|
||||
|
@ -129,12 +135,13 @@ struct DataDistributionTracker : public IDDShardTracker {
|
|||
Reference<PhysicalShardCollection> physicalShardCollection,
|
||||
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
||||
KeyRangeMap<ShardTrackedData>* shards,
|
||||
bool* trackerCancelled)
|
||||
bool* trackerCancelled,
|
||||
Optional<Reference<TenantCache>> ddTenantCache)
|
||||
: IDDShardTracker(), cx(cx), distributorId(distributorId), shards(shards), sizeChanges(false),
|
||||
systemSizeEstimate(0), dbSizeEstimate(new AsyncVar<int64_t>()), maxShardSize(new AsyncVar<Optional<int64_t>>()),
|
||||
output(output), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
|
||||
physicalShardCollection(physicalShardCollection), readyToStart(readyToStart),
|
||||
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled) {}
|
||||
anyZeroHealthyTeams(anyZeroHealthyTeams), trackerCancelled(trackerCancelled), ddTenantCache(ddTenantCache) {}
|
||||
|
||||
~DataDistributionTracker() override {
|
||||
*trackerCancelled = true;
|
||||
|
@ -501,6 +508,375 @@ private:
|
|||
Promise<Void> cleared;
|
||||
};
|
||||
|
||||
std::string describeSplit(KeyRange keys, Standalone<VectorRef<KeyRef>>& splitKeys) {
|
||||
std::string s;
|
||||
s += "[" + keys.begin.toString() + ", " + keys.end.toString() + ") -> ";
|
||||
|
||||
for (auto& sk : splitKeys) {
|
||||
s += sk.printable() + " ";
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
void traceSplit(KeyRange keys, Standalone<VectorRef<KeyRef>>& splitKeys) {
|
||||
auto s = describeSplit(keys, splitKeys);
|
||||
TraceEvent(SevInfo, "ExecutingShardSplit").detail("AtKeys", s);
|
||||
}
|
||||
|
||||
void executeShardSplit(DataDistributionTracker* self,
|
||||
KeyRange keys,
|
||||
Standalone<VectorRef<KeyRef>> splitKeys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
|
||||
bool relocate,
|
||||
RelocateReason reason) {
|
||||
|
||||
int numShards = splitKeys.size() - 1;
|
||||
ASSERT(numShards > 1);
|
||||
|
||||
int skipRange = deterministicRandom()->randomInt(0, numShards);
|
||||
|
||||
auto s = describeSplit(keys, splitKeys);
|
||||
TraceEvent(SevInfo, "ExecutingShardSplit").suppressFor(0.5).detail("Splitting", s).detail("NumShards", numShards);
|
||||
|
||||
// The queue can't deal with RelocateShard requests which split an existing shard into three pieces, so
|
||||
// we have to send the unskipped ranges in this order (nibbling in from the edges of the old range)
|
||||
for (int i = 0; i < skipRange; i++)
|
||||
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
|
||||
restartShardTrackers(self, KeyRangeRef(splitKeys[skipRange], splitKeys[skipRange + 1]));
|
||||
for (int i = numShards - 1; i > skipRange; i--)
|
||||
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
|
||||
|
||||
for (int i = 0; i < skipRange; i++) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard(r);
|
||||
if (relocate) {
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
|
||||
}
|
||||
}
|
||||
for (int i = numShards - 1; i > skipRange; i--) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard(r);
|
||||
if (relocate) {
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
|
||||
}
|
||||
}
|
||||
|
||||
self->sizeChanges.add(changeSizes(self, keys, shardSize->get().get().metrics.bytes));
|
||||
}
|
||||
|
||||
struct RangeToSplit {
|
||||
RangeMap<Standalone<StringRef>, ShardTrackedData, KeyRangeRef>::iterator shard;
|
||||
Standalone<VectorRef<KeyRef>> faultLines;
|
||||
|
||||
RangeToSplit(RangeMap<Standalone<StringRef>, ShardTrackedData, KeyRangeRef>::iterator shard,
|
||||
Standalone<VectorRef<KeyRef>> faultLines)
|
||||
: shard(shard), faultLines(faultLines) {}
|
||||
};
|
||||
|
||||
Standalone<VectorRef<KeyRef>> findShardFaultLines(KeyRef shardBegin,
|
||||
KeyRef shardEnd,
|
||||
KeyRef tenantBegin,
|
||||
KeyRef tenantEnd) {
|
||||
Standalone<VectorRef<KeyRef>> faultLines;
|
||||
|
||||
ASSERT((shardBegin < tenantBegin && shardEnd > tenantBegin) || (shardBegin < tenantEnd && shardEnd > tenantEnd));
|
||||
|
||||
faultLines.push_back_deep(faultLines.arena(), shardBegin);
|
||||
if (shardBegin < tenantBegin && shardEnd > tenantBegin) {
|
||||
faultLines.push_back_deep(faultLines.arena(), tenantBegin);
|
||||
}
|
||||
if (shardBegin < tenantEnd && shardEnd > tenantEnd) {
|
||||
faultLines.push_back_deep(faultLines.arena(), tenantEnd);
|
||||
}
|
||||
faultLines.push_back_deep(faultLines.arena(), shardEnd);
|
||||
|
||||
return faultLines;
|
||||
}
|
||||
|
||||
std::vector<RangeToSplit> findTenantShardBoundaries(KeyRangeMap<ShardTrackedData>* shards, KeyRange tenantKeys) {
|
||||
|
||||
std::vector<RangeToSplit> result;
|
||||
auto shardContainingTenantStart = shards->rangeContaining(tenantKeys.begin);
|
||||
auto shardContainingTenantEnd = shards->rangeContainingKeyBefore(tenantKeys.end);
|
||||
|
||||
// same shard
|
||||
if (shardContainingTenantStart == shardContainingTenantEnd) {
|
||||
// If shard boundaries are not aligned with tenantKeys
|
||||
if (shardContainingTenantStart.begin() != tenantKeys.begin ||
|
||||
shardContainingTenantStart.end() != tenantKeys.end) {
|
||||
|
||||
auto startShardSize = shardContainingTenantStart->value().stats;
|
||||
|
||||
if (startShardSize->get().present()) {
|
||||
auto faultLines = findShardFaultLines(shardContainingTenantStart->begin(),
|
||||
shardContainingTenantStart->end(),
|
||||
tenantKeys.begin,
|
||||
tenantKeys.end);
|
||||
result.emplace_back(shardContainingTenantStart, faultLines);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto startShardSize = shardContainingTenantStart->value().stats;
|
||||
auto endShardSize = shardContainingTenantEnd->value().stats;
|
||||
|
||||
if (startShardSize->get().present() && endShardSize->get().present()) {
|
||||
if (shardContainingTenantStart->begin() != tenantKeys.begin) {
|
||||
auto faultLines = findShardFaultLines(shardContainingTenantStart->begin(),
|
||||
shardContainingTenantStart->end(),
|
||||
tenantKeys.begin,
|
||||
tenantKeys.end);
|
||||
result.emplace_back(shardContainingTenantStart, faultLines);
|
||||
}
|
||||
|
||||
if (shardContainingTenantEnd->end() != tenantKeys.end) {
|
||||
auto faultLines = findShardFaultLines(shardContainingTenantEnd->begin(),
|
||||
shardContainingTenantEnd->end(),
|
||||
tenantKeys.begin,
|
||||
tenantKeys.end);
|
||||
result.emplace_back(shardContainingTenantEnd, faultLines);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
bool faultLinesMatch(std::vector<RangeToSplit>& ranges, std::vector<std::vector<KeyRef>>& expectedFaultLines) {
|
||||
if (ranges.size() != expectedFaultLines.size()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (auto& range : ranges) {
|
||||
KeyRangeRef keys = KeyRangeRef(range.shard->begin(), range.shard->end());
|
||||
traceSplit(keys, range.faultLines);
|
||||
}
|
||||
|
||||
for (int r = 0; r < ranges.size(); r++) {
|
||||
if (ranges[r].faultLines.size() != expectedFaultLines[r].size()) {
|
||||
return false;
|
||||
}
|
||||
for (int fl = 0; fl < ranges[r].faultLines.size(); fl++) {
|
||||
if (ranges[r].faultLines[fl] != expectedFaultLines[r][fl]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/SingleShardSplit") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin = "a"_sr, end = "f"_sr;
|
||||
KeyRangeRef k(begin, end);
|
||||
shards.insert(k, data);
|
||||
|
||||
KeyRangeRef tenantKeys("b"_sr, "c"_sr);
|
||||
|
||||
data.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "c"_sr, "f"_sr } };
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/SingleShardTenantAligned") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin = "a"_sr, end = "f"_sr;
|
||||
KeyRangeRef k(begin, end);
|
||||
shards.insert(k, data);
|
||||
|
||||
KeyRangeRef tenantKeys("a"_sr, "f"_sr);
|
||||
|
||||
data.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = {};
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/SingleShardTenantAlignedAtStart") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin = "a"_sr, end = "f"_sr;
|
||||
KeyRangeRef k(begin, end);
|
||||
shards.insert(k, data);
|
||||
|
||||
KeyRangeRef tenantKeys("a"_sr, "d"_sr);
|
||||
|
||||
data.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "d"_sr, "f"_sr } };
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/SingleShardTenantAlignedAtEnd") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin = "a"_sr, end = "f"_sr;
|
||||
KeyRangeRef k(begin, end);
|
||||
shards.insert(k, data);
|
||||
|
||||
KeyRangeRef tenantKeys("b"_sr, "f"_sr);
|
||||
|
||||
data.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "f"_sr } };
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/DoubleShardSplit") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data1, data2;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data1.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
data2.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin1 = "a"_sr, end1 = "c"_sr;
|
||||
KeyRef begin2 = "d"_sr, end2 = "f"_sr;
|
||||
KeyRangeRef k1(begin1, end1);
|
||||
KeyRangeRef k2(begin2, end2);
|
||||
|
||||
shards.insert(k1, data1);
|
||||
shards.insert(k2, data2);
|
||||
|
||||
KeyRangeRef tenantKeys("b"_sr, "e"_sr);
|
||||
|
||||
data1.stats->set(sm);
|
||||
data2.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
for (auto& range : result) {
|
||||
KeyRangeRef keys = KeyRangeRef(range.shard->begin(), range.shard->end());
|
||||
traceSplit(keys, range.faultLines);
|
||||
}
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "c"_sr }, { "d"_sr, "e"_sr, "f"_sr } };
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/DoubleShardTenantAlignedAtStart") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data1, data2;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data1.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
data2.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin1 = "a"_sr, end1 = "c"_sr;
|
||||
KeyRef begin2 = "d"_sr, end2 = "f"_sr;
|
||||
KeyRangeRef k1(begin1, end1);
|
||||
KeyRangeRef k2(begin2, end2);
|
||||
|
||||
shards.insert(k1, data1);
|
||||
shards.insert(k2, data2);
|
||||
|
||||
KeyRangeRef tenantKeys("a"_sr, "e"_sr);
|
||||
|
||||
data1.stats->set(sm);
|
||||
data2.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "d"_sr, "e"_sr, "f"_sr } };
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/DataDistribution/Tenant/DoubleShardTenantAlignedAtEnd") {
|
||||
wait(Future<Void>(Void()));
|
||||
ShardTrackedData data1, data2;
|
||||
ShardMetrics sm(StorageMetrics(), now(), 1);
|
||||
data1.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
data2.stats = makeReference<AsyncVar<Optional<ShardMetrics>>>();
|
||||
|
||||
KeyRangeMap<ShardTrackedData> shards;
|
||||
|
||||
KeyRef begin1 = "a"_sr, end1 = "c"_sr;
|
||||
KeyRef begin2 = "d"_sr, end2 = "f"_sr;
|
||||
KeyRangeRef k1(begin1, end1);
|
||||
KeyRangeRef k2(begin2, end2);
|
||||
|
||||
shards.insert(k1, data1);
|
||||
shards.insert(k2, data2);
|
||||
|
||||
KeyRangeRef tenantKeys("b"_sr, "f"_sr);
|
||||
|
||||
data1.stats->set(sm);
|
||||
data2.stats->set(sm);
|
||||
|
||||
std::vector<RangeToSplit> result = findTenantShardBoundaries(&shards, tenantKeys);
|
||||
|
||||
std::vector<std::vector<KeyRef>> expectedFaultLines = { { "a"_sr, "b"_sr, "c"_sr } };
|
||||
ASSERT(faultLinesMatch(result, expectedFaultLines));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tenantShardSplitter(DataDistributionTracker* self, KeyRange tenantKeys) {
|
||||
wait(Future<Void>(Void()));
|
||||
std::vector<RangeToSplit> rangesToSplit = findTenantShardBoundaries(self->shards, tenantKeys);
|
||||
|
||||
for (auto& range : rangesToSplit) {
|
||||
KeyRangeRef keys = KeyRangeRef(range.shard->begin(), range.shard->end());
|
||||
traceSplit(keys, range.faultLines);
|
||||
executeShardSplit(self, keys, range.faultLines, range.shard->value().stats, true, RelocateReason::TENANT_SPLIT);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> tenantCreationHandling(DataDistributionTracker* self, TenantCacheTenantCreated req) {
|
||||
TraceEvent(SevInfo, "TenantCacheTenantCreated").detail("Begin", req.keys.begin).detail("End", req.keys.end);
|
||||
|
||||
wait(tenantShardSplitter(self, req.keys));
|
||||
req.reply.send(true);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
|
||||
KeyRange keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
|
||||
|
@ -540,27 +916,7 @@ ACTOR Future<Void> shardSplitter(DataDistributionTracker* self,
|
|||
.detail("NumShards", numShards);
|
||||
|
||||
if (numShards > 1) {
|
||||
int skipRange = deterministicRandom()->randomInt(0, numShards);
|
||||
// The queue can't deal with RelocateShard requests which split an existing shard into three pieces, so
|
||||
// we have to send the unskipped ranges in this order (nibbling in from the edges of the old range)
|
||||
for (int i = 0; i < skipRange; i++)
|
||||
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
|
||||
restartShardTrackers(self, KeyRangeRef(splitKeys[skipRange], splitKeys[skipRange + 1]));
|
||||
for (int i = numShards - 1; i > skipRange; i--)
|
||||
restartShardTrackers(self, KeyRangeRef(splitKeys[i], splitKeys[i + 1]));
|
||||
|
||||
for (int i = 0; i < skipRange; i++) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard(r);
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
|
||||
}
|
||||
for (int i = numShards - 1; i > skipRange; i--) {
|
||||
KeyRangeRef r(splitKeys[i], splitKeys[i + 1]);
|
||||
self->shardsAffectedByTeamFailure->defineShard(r);
|
||||
self->output.send(RelocateShard(r, DataMovementReason::SPLIT_SHARD, reason));
|
||||
}
|
||||
|
||||
self->sizeChanges.add(changeSizes(self, keys, shardSize->get().get().metrics.bytes));
|
||||
executeShardSplit(self, keys, splitKeys, shardSize, true, reason);
|
||||
} else {
|
||||
wait(delay(1.0, TaskPriority::DataDistribution)); // In case the reason the split point was off was due to a
|
||||
// discrepancy between storage servers
|
||||
|
@ -579,6 +935,43 @@ ACTOR Future<Void> brokenPromiseToReady(Future<Void> f) {
|
|||
return Void();
|
||||
}
|
||||
|
||||
static bool shardMergeFeasible(DataDistributionTracker* self, KeyRange const& keys, KeyRangeRef adjRange) {
|
||||
bool honorTenantKeyspaceBoundaries = self->ddTenantCache.present();
|
||||
|
||||
if (!honorTenantKeyspaceBoundaries) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Optional<Reference<TCTenantInfo>> tenantOwningRange = {};
|
||||
Optional<Reference<TCTenantInfo>> tenantOwningAdjRange = {};
|
||||
|
||||
tenantOwningRange = self->ddTenantCache.get()->tenantOwning(keys.begin);
|
||||
tenantOwningAdjRange = self->ddTenantCache.get()->tenantOwning(adjRange.begin);
|
||||
|
||||
if ((tenantOwningRange.present() != tenantOwningAdjRange.present()) ||
|
||||
(tenantOwningRange.present() && (tenantOwningRange != tenantOwningAdjRange))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool shardForwardMergeFeasible(DataDistributionTracker* self, KeyRange const& keys, KeyRangeRef nextRange) {
|
||||
if (keys.end == allKeys.end) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return shardMergeFeasible(self, keys, nextRange);
|
||||
}
|
||||
|
||||
static bool shardBackwardMergeFeasible(DataDistributionTracker* self, KeyRange const& keys, KeyRangeRef prevRange) {
|
||||
if (keys.begin == allKeys.begin) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return shardMergeFeasible(self, keys, prevRange);
|
||||
}
|
||||
|
||||
Future<Void> shardMerger(DataDistributionTracker* self,
|
||||
KeyRange const& keys,
|
||||
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize) {
|
||||
|
@ -594,6 +987,7 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
int shardsMerged = 1;
|
||||
bool forwardComplete = false;
|
||||
KeyRangeRef merged;
|
||||
|
||||
StorageMetrics endingStats = shardSize->get().get().metrics;
|
||||
int shardCount = shardSize->get().get().shardCount;
|
||||
double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime;
|
||||
|
@ -614,7 +1008,14 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
forwardComplete = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
++nextIter;
|
||||
if (!shardForwardMergeFeasible(self, keys, nextIter->range())) {
|
||||
--nextIter;
|
||||
forwardComplete = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
newMetrics = nextIter->value().stats->get();
|
||||
|
||||
// If going forward, give up when the next shard's stats are not yet present, or if the
|
||||
|
@ -629,6 +1030,11 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
--prevIter;
|
||||
newMetrics = prevIter->value().stats->get();
|
||||
|
||||
if (!shardBackwardMergeFeasible(self, keys, prevIter->range())) {
|
||||
++prevIter;
|
||||
break;
|
||||
}
|
||||
|
||||
// If going backward, stop when the stats are not present or if the shard is already over the merge
|
||||
// bounds. If this check triggers right away (if we have not merged anything) then return a trigger
|
||||
// on the previous shard changing "size".
|
||||
|
@ -654,8 +1060,8 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
shardsMerged++;
|
||||
|
||||
auto shardBounds = getShardSizeBounds(merged, maxShardSize);
|
||||
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it means
|
||||
// the shard's metric may not be stable yet. So we cannot continue merging in this direction.
|
||||
// If we just recently get the current shard's metrics (i.e., less than DD_LOW_BANDWIDTH_DELAY ago), it
|
||||
// means the shard's metric may not be stable yet. So we cannot continue merging in this direction.
|
||||
if (endingStats.bytes >= shardBounds.min.bytes || getBandwidthStatus(endingStats) != BandwidthStatusLow ||
|
||||
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
|
||||
shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT) {
|
||||
|
@ -682,6 +1088,10 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
}
|
||||
}
|
||||
|
||||
if (shardsMerged == 1) {
|
||||
return brokenPromiseToReady(nextIter->value().stats->onChange());
|
||||
}
|
||||
|
||||
// restarting shard tracker will derefenced values in the shard map, so make a copy
|
||||
KeyRange mergeRange = merged;
|
||||
|
||||
|
@ -689,9 +1099,11 @@ Future<Void> shardMerger(DataDistributionTracker* self,
|
|||
// NewKeys: New key range after shards are merged;
|
||||
// EndingSize: The new merged shard size in bytes;
|
||||
// BatchedMerges: The number of shards merged. Each shard is defined in self->shards;
|
||||
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's status
|
||||
// LastLowBandwidthStartTime: When does a shard's bandwidth status becomes BandwidthStatusLow. If a shard's
|
||||
// status
|
||||
// becomes BandwidthStatusLow less than DD_LOW_BANDWIDTH_DELAY ago, the merging logic will stop at the shard;
|
||||
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may have
|
||||
// ShardCount: The number of non-splittable shards that are merged. Each shard is defined in self->shards may
|
||||
// have
|
||||
// more than 1 shards.
|
||||
TraceEvent("RelocateShardMergeMetrics", self->distributorId)
|
||||
.detail("OldKeys", keys)
|
||||
|
@ -724,10 +1136,22 @@ ACTOR Future<Void> shardEvaluator(DataDistributionTracker* self,
|
|||
ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get());
|
||||
StorageMetrics const& stats = shardSize->get().get().metrics;
|
||||
auto bandwidthStatus = getBandwidthStatus(stats);
|
||||
|
||||
bool sizeSplit = stats.bytes > shardBounds.max.bytes,
|
||||
writeSplit = bandwidthStatus == BandwidthStatusHigh && keys.begin < keyServersKeys.begin;
|
||||
bool shouldSplit = sizeSplit || writeSplit;
|
||||
bool shouldMerge = stats.bytes < shardBounds.min.bytes && bandwidthStatus == BandwidthStatusLow;
|
||||
|
||||
auto prevIter = self->shards->rangeContaining(keys.begin);
|
||||
if (keys.begin > allKeys.begin)
|
||||
--prevIter;
|
||||
|
||||
auto nextIter = self->shards->rangeContaining(keys.begin);
|
||||
if (keys.end < allKeys.end)
|
||||
++nextIter;
|
||||
|
||||
bool shouldMerge = stats.bytes < shardBounds.min.bytes && bandwidthStatus == BandwidthStatusLow &&
|
||||
(shardForwardMergeFeasible(self, keys, nextIter.range()) ||
|
||||
shardBackwardMergeFeasible(self, keys, prevIter.range()));
|
||||
|
||||
// Every invocation must set this or clear it
|
||||
if (shouldMerge && !self->anyZeroHealthyTeams->get()) {
|
||||
|
@ -796,8 +1220,8 @@ ACTOR Future<Void> shardTracker(DataDistributionTracker::SafeAccessor self,
|
|||
// Use the current known size to check for (and start) splits and merges.
|
||||
wait(shardEvaluator(self(), keys, shardSize, wantsToMerge));
|
||||
|
||||
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
|
||||
// delay(0) mitigates the resulting SlowTask
|
||||
// We could have a lot of actors being released from the previous wait at the same time. Immediately
|
||||
// calling delay(0) mitigates the resulting SlowTask
|
||||
wait(delay(0, TaskPriority::DataDistribution));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -1045,7 +1469,8 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
|
||||
UID distributorId,
|
||||
KeyRangeMap<ShardTrackedData>* shards,
|
||||
bool* trackerCancelled) {
|
||||
bool* trackerCancelled,
|
||||
Optional<Reference<TenantCache>> ddTenantCache) {
|
||||
state DataDistributionTracker self(cx,
|
||||
distributorId,
|
||||
readyToStart,
|
||||
|
@ -1054,7 +1479,8 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
physicalShardCollection,
|
||||
anyZeroHealthyTeams,
|
||||
shards,
|
||||
trackerCancelled);
|
||||
trackerCancelled,
|
||||
ddTenantCache);
|
||||
state Future<Void> loggingTrigger = Void();
|
||||
state Future<Void> readHotDetect = readHotDetector(&self);
|
||||
state Reference<EventCacheHolder> ddTrackerStatsEventHolder = makeReference<EventCacheHolder>("DDTrackerStats");
|
||||
|
@ -1062,6 +1488,11 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
wait(trackInitialShards(&self, initData));
|
||||
initData = Reference<InitialDataDistribution>();
|
||||
|
||||
state PromiseStream<TenantCacheTenantCreated> tenantCreationSignal;
|
||||
if (self.ddTenantCache.present()) {
|
||||
tenantCreationSignal = self.ddTenantCache.get()->tenantCreationSignal;
|
||||
}
|
||||
|
||||
loop choose {
|
||||
when(Promise<int64_t> req = waitNext(getAverageShardBytes)) { req.send(self.getAverageShardBytes()); }
|
||||
when(wait(loggingTrigger)) {
|
||||
|
@ -1083,6 +1514,11 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
self.sizeChanges.add(fetchShardMetricsList(&self, req));
|
||||
}
|
||||
when(wait(self.sizeChanges.getResult())) {}
|
||||
|
||||
when(TenantCacheTenantCreated newTenant = waitNext(tenantCreationSignal.getFuture())) {
|
||||
self.sizeChanges.add(tenantCreationHandling(&self, newTenant));
|
||||
}
|
||||
|
||||
when(KeyRange req = waitNext(self.shardsAffectedByTeamFailure->restartShardTracker.getFuture())) {
|
||||
restartShardTrackers(&self, req);
|
||||
}
|
||||
|
|
|
@ -584,12 +584,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
try {
|
||||
wait(DataDistributor::init(self));
|
||||
|
||||
state Reference<TenantCache> ddTenantCache;
|
||||
if (ddIsTenantAware) {
|
||||
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
|
||||
wait(ddTenantCache->build(cx));
|
||||
}
|
||||
|
||||
// When/If this assertion fails, Evan owes Ben a pat on the back for his foresight
|
||||
ASSERT(self->configuration.storageTeamSize > 0);
|
||||
|
||||
|
@ -601,6 +595,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
state Reference<AsyncVar<bool>> processingWiggle(new AsyncVar<bool>(false));
|
||||
state Promise<Void> readyToStart;
|
||||
|
||||
state Optional<Reference<TenantCache>> ddTenantCache;
|
||||
if (ddIsTenantAware) {
|
||||
ddTenantCache = makeReference<TenantCache>(cx, self->ddId);
|
||||
wait(ddTenantCache.get()->build());
|
||||
}
|
||||
|
||||
self->shardsAffectedByTeamFailure = makeReference<ShardsAffectedByTeamFailure>();
|
||||
self->physicalShardCollection = makeReference<PhysicalShardCollection>();
|
||||
wait(self->resumeRelocations());
|
||||
|
@ -624,10 +624,6 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
} else {
|
||||
anyZeroHealthyTeams = zeroHealthyTeams[0];
|
||||
}
|
||||
if (ddIsTenantAware) {
|
||||
actors.push_back(reportErrorsExcept(
|
||||
ddTenantCache->monitorTenantMap(), "DDTenantCacheMonitor", self->ddId, &normalDDQueueErrors()));
|
||||
}
|
||||
|
||||
actors.push_back(self->pollMoveKeysLock());
|
||||
actors.push_back(reportErrorsExcept(dataDistributionTracker(self->initData,
|
||||
|
@ -643,7 +639,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
anyZeroHealthyTeams,
|
||||
self->ddId,
|
||||
&shards,
|
||||
&trackerCancelled),
|
||||
&trackerCancelled,
|
||||
ddTenantCache),
|
||||
"DDTracker",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
@ -673,6 +670,13 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
|
|||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
|
||||
if (ddIsTenantAware) {
|
||||
actors.push_back(reportErrorsExcept(ddTenantCache.get()->monitorTenantMap(),
|
||||
"DDTenantCacheMonitor",
|
||||
self->ddId,
|
||||
&normalDDQueueErrors()));
|
||||
}
|
||||
|
||||
std::vector<DDTeamCollection*> teamCollectionsPtrs;
|
||||
primaryTeamCollection = makeReference<DDTeamCollection>(
|
||||
cx,
|
||||
|
|
|
@ -18,8 +18,11 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbserver/TenantCache.h"
|
||||
#include "flow/flow.h"
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include "flow/actorcompiler.h"
|
||||
|
@ -87,6 +90,8 @@ public:
|
|||
for (int i = 0; i < tenantList.size(); i++) {
|
||||
if (tenantCache->update(tenantList[i].first, tenantList[i].second)) {
|
||||
tenantListUpdated = true;
|
||||
TenantCacheTenantCreated req(tenantList[i].second.prefix);
|
||||
tenantCache->tenantCreationSignal.send(req);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,7 +179,7 @@ std::string TenantCache::desc() const {
|
|||
s += ", ";
|
||||
}
|
||||
|
||||
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.printable();
|
||||
s += "Name: " + tenant->name().toString() + " Prefix: " + tenantPrefix.toString();
|
||||
count++;
|
||||
}
|
||||
|
||||
|
@ -194,10 +199,23 @@ bool TenantCache::isTenantKey(KeyRef key) const {
|
|||
return true;
|
||||
}
|
||||
|
||||
Future<Void> TenantCache::build(Database cx) {
|
||||
Future<Void> TenantCache::build() {
|
||||
return TenantCacheImpl::build(this);
|
||||
}
|
||||
|
||||
Optional<Reference<TCTenantInfo>> TenantCache::tenantOwning(KeyRef key) const {
|
||||
auto it = tenantCache.lastLessOrEqual(key);
|
||||
if (it == tenantCache.end()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!key.startsWith(it->key)) {
|
||||
return {};
|
||||
}
|
||||
|
||||
return it->value;
|
||||
}
|
||||
|
||||
Future<Void> TenantCache::monitorTenantMap() {
|
||||
return TenantCacheImpl::monitorTenantMap(this);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,9 @@
|
|||
#define FDBSERVER_DATA_DISTRIBUTION_ACTOR_H
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/MoveKeys.actor.h"
|
||||
#include "fdbserver/TenantCache.h"
|
||||
#include "fdbserver/TCInfo.h"
|
||||
#include "fdbclient/RunTransaction.actor.h"
|
||||
#include "fdbserver/DDTxnProcessor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -45,7 +48,16 @@
|
|||
// RelocateReason to DataMovementReason is one-to-N mapping
|
||||
class RelocateReason {
|
||||
public:
|
||||
enum Value : int8_t { OTHER = 0, REBALANCE_DISK, REBALANCE_READ, MERGE_SHARD, SIZE_SPLIT, WRITE_SPLIT, __COUNT };
|
||||
enum Value : int8_t {
|
||||
OTHER = 0,
|
||||
REBALANCE_DISK,
|
||||
REBALANCE_READ,
|
||||
MERGE_SHARD,
|
||||
SIZE_SPLIT,
|
||||
WRITE_SPLIT,
|
||||
TENANT_SPLIT,
|
||||
__COUNT
|
||||
};
|
||||
RelocateReason(Value v) : value(v) { ASSERT(value != __COUNT); }
|
||||
explicit RelocateReason(int v) : value((Value)v) { ASSERT(value != __COUNT); }
|
||||
std::string toString() const {
|
||||
|
@ -62,6 +74,8 @@ public:
|
|||
return "SizeSplit";
|
||||
case WRITE_SPLIT:
|
||||
return "WriteSplit";
|
||||
case TENANT_SPLIT:
|
||||
return "TenantSplit";
|
||||
case __COUNT:
|
||||
ASSERT(false);
|
||||
}
|
||||
|
@ -153,130 +167,6 @@ private:
|
|||
moveReason(DataMovementReason::INVALID) {}
|
||||
};
|
||||
|
||||
struct IDataDistributionTeam {
|
||||
virtual std::vector<StorageServerInterface> getLastKnownServerInterfaces() const = 0;
|
||||
virtual int size() const = 0;
|
||||
virtual std::vector<UID> const& getServerIDs() const = 0;
|
||||
virtual void addDataInFlightToTeam(int64_t delta) = 0;
|
||||
virtual void addReadInFlightToTeam(int64_t delta) = 0;
|
||||
virtual int64_t getDataInFlightToTeam() const = 0;
|
||||
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
|
||||
virtual int64_t getReadInFlightToTeam() const = 0;
|
||||
virtual double getLoadReadBandwidth(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
|
||||
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) const = 0;
|
||||
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) const = 0;
|
||||
virtual bool hasHealthyAvailableSpace(double minRatio) const = 0;
|
||||
virtual Future<Void> updateStorageMetrics() = 0;
|
||||
virtual void addref() const = 0;
|
||||
virtual void delref() const = 0;
|
||||
virtual bool isHealthy() const = 0;
|
||||
virtual void setHealthy(bool) = 0;
|
||||
virtual int getPriority() const = 0;
|
||||
virtual void setPriority(int) = 0;
|
||||
virtual bool isOptimal() const = 0;
|
||||
virtual bool isWrongConfiguration() const = 0;
|
||||
virtual void setWrongConfiguration(bool) = 0;
|
||||
virtual void addServers(const std::vector<UID>& servers) = 0;
|
||||
virtual std::string getTeamID() const = 0;
|
||||
|
||||
std::string getDesc() const {
|
||||
const auto& servers = getLastKnownServerInterfaces();
|
||||
std::string s = format("TeamID %s; ", getTeamID().c_str());
|
||||
s += format("Size %d; ", servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
if (i)
|
||||
s += ", ";
|
||||
s += servers[i].address().toString() + " " + servers[i].id().shortString();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
};
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(WantNewServers);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(WantTrueBest);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerDiskUtil);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(TeamMustHaveShards);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ForReadBalance);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerReadUtil);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(FindTeamByServers);
|
||||
|
||||
struct GetTeamRequest {
|
||||
bool wantsNewServers; // In additional to servers in completeSources, try to find teams with new server
|
||||
bool wantsTrueBest;
|
||||
bool preferLowerDiskUtil; // if true, lower utilized team has higher score
|
||||
bool teamMustHaveShards;
|
||||
bool forReadBalance;
|
||||
bool preferLowerReadUtil; // only make sense when forReadBalance is true
|
||||
double inflightPenalty;
|
||||
bool findTeamByServers;
|
||||
std::vector<UID> completeSources;
|
||||
std::vector<UID> src;
|
||||
Promise<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> reply;
|
||||
|
||||
typedef Reference<IDataDistributionTeam> TeamRef;
|
||||
|
||||
GetTeamRequest() {}
|
||||
GetTeamRequest(WantNewServers wantsNewServers,
|
||||
WantTrueBest wantsTrueBest,
|
||||
PreferLowerDiskUtil preferLowerDiskUtil,
|
||||
TeamMustHaveShards teamMustHaveShards,
|
||||
ForReadBalance forReadBalance = ForReadBalance::False,
|
||||
PreferLowerReadUtil preferLowerReadUtil = PreferLowerReadUtil::False,
|
||||
double inflightPenalty = 1.0)
|
||||
: wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerDiskUtil(preferLowerDiskUtil),
|
||||
teamMustHaveShards(teamMustHaveShards), forReadBalance(forReadBalance),
|
||||
preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty),
|
||||
findTeamByServers(FindTeamByServers::False) {}
|
||||
GetTeamRequest(std::vector<UID> servers)
|
||||
: wantsNewServers(WantNewServers::False), wantsTrueBest(WantTrueBest::False),
|
||||
preferLowerDiskUtil(PreferLowerDiskUtil::False), teamMustHaveShards(TeamMustHaveShards::False),
|
||||
forReadBalance(ForReadBalance::False), preferLowerReadUtil(PreferLowerReadUtil::False), inflightPenalty(1.0),
|
||||
findTeamByServers(FindTeamByServers::True), src(std::move(servers)) {}
|
||||
|
||||
// return true if a.score < b.score
|
||||
[[nodiscard]] bool lessCompare(TeamRef a, TeamRef b, int64_t aLoadBytes, int64_t bLoadBytes) const {
|
||||
int res = 0;
|
||||
if (forReadBalance) {
|
||||
res = preferLowerReadUtil ? greaterReadLoad(a, b) : lessReadLoad(a, b);
|
||||
}
|
||||
return res == 0 ? lessCompareByLoad(aLoadBytes, bLoadBytes) : res < 0;
|
||||
}
|
||||
|
||||
std::string getDesc() const {
|
||||
std::stringstream ss;
|
||||
|
||||
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
|
||||
<< " PreferLowerDiskUtil:" << preferLowerDiskUtil << " teamMustHaveShards:" << teamMustHaveShards
|
||||
<< "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty
|
||||
<< " findTeamByServers:" << findTeamByServers << ";";
|
||||
ss << "CompleteSources:";
|
||||
for (const auto& cs : completeSources) {
|
||||
ss << cs.toString() << ",";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
|
||||
private:
|
||||
// return true if preferHigherUtil && aLoadBytes <= bLoadBytes (higher load bytes has larger score)
|
||||
// or preferLowerUtil && aLoadBytes > bLoadBytes
|
||||
bool lessCompareByLoad(int64_t aLoadBytes, int64_t bLoadBytes) const {
|
||||
bool lessLoad = aLoadBytes <= bLoadBytes;
|
||||
return preferLowerDiskUtil ? !lessLoad : lessLoad;
|
||||
}
|
||||
|
||||
// return -1 if a.readload > b.readload
|
||||
static int greaterReadLoad(TeamRef a, TeamRef b) {
|
||||
auto r1 = a->getLoadReadBandwidth(true), r2 = b->getLoadReadBandwidth(true);
|
||||
return r1 == r2 ? 0 : (r1 > r2 ? -1 : 1);
|
||||
}
|
||||
// return -1 if a.readload < b.readload
|
||||
static int lessReadLoad(TeamRef a, TeamRef b) {
|
||||
auto r1 = a->getLoadReadBandwidth(false), r2 = b->getLoadReadBandwidth(false);
|
||||
return r1 == r2 ? 0 : (r1 < r2 ? -1 : 1);
|
||||
}
|
||||
};
|
||||
|
||||
struct GetMetricsRequest {
|
||||
KeyRange keys;
|
||||
Promise<StorageMetrics> reply;
|
||||
|
@ -621,7 +511,8 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
|
|||
Reference<AsyncVar<bool>> zeroHealthyTeams,
|
||||
UID distributorId,
|
||||
KeyRangeMap<ShardTrackedData>* shards,
|
||||
bool* trackerCancelled);
|
||||
bool* trackerCancelled,
|
||||
Optional<Reference<TenantCache>> ddTenantCache);
|
||||
|
||||
ACTOR Future<Void> dataDistributionQueue(Database cx,
|
||||
PromiseStream<RelocateShard> output,
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/*
|
||||
* DataDistributionTeam.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
|
||||
struct IDataDistributionTeam {
|
||||
virtual std::vector<StorageServerInterface> getLastKnownServerInterfaces() const = 0;
|
||||
virtual int size() const = 0;
|
||||
virtual std::vector<UID> const& getServerIDs() const = 0;
|
||||
virtual void addDataInFlightToTeam(int64_t delta) = 0;
|
||||
virtual void addReadInFlightToTeam(int64_t delta) = 0;
|
||||
virtual int64_t getDataInFlightToTeam() const = 0;
|
||||
virtual int64_t getLoadBytes(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
|
||||
virtual int64_t getReadInFlightToTeam() const = 0;
|
||||
virtual double getLoadReadBandwidth(bool includeInFlight = true, double inflightPenalty = 1.0) const = 0;
|
||||
virtual int64_t getMinAvailableSpace(bool includeInFlight = true) const = 0;
|
||||
virtual double getMinAvailableSpaceRatio(bool includeInFlight = true) const = 0;
|
||||
virtual bool hasHealthyAvailableSpace(double minRatio) const = 0;
|
||||
virtual Future<Void> updateStorageMetrics() = 0;
|
||||
virtual void addref() const = 0;
|
||||
virtual void delref() const = 0;
|
||||
virtual bool isHealthy() const = 0;
|
||||
virtual void setHealthy(bool) = 0;
|
||||
virtual int getPriority() const = 0;
|
||||
virtual void setPriority(int) = 0;
|
||||
virtual bool isOptimal() const = 0;
|
||||
virtual bool isWrongConfiguration() const = 0;
|
||||
virtual void setWrongConfiguration(bool) = 0;
|
||||
virtual void addServers(const std::vector<UID>& servers) = 0;
|
||||
virtual std::string getTeamID() const = 0;
|
||||
|
||||
std::string getDesc() const {
|
||||
const auto& servers = getLastKnownServerInterfaces();
|
||||
std::string s = format("TeamID %s; ", getTeamID().c_str());
|
||||
s += format("Size %d; ", servers.size());
|
||||
for (int i = 0; i < servers.size(); i++) {
|
||||
if (i)
|
||||
s += ", ";
|
||||
s += servers[i].address().toString() + " " + servers[i].id().shortString();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
};
|
||||
|
||||
FDB_DECLARE_BOOLEAN_PARAM(WantNewServers);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(WantTrueBest);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerDiskUtil);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(TeamMustHaveShards);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(ForReadBalance);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(PreferLowerReadUtil);
|
||||
FDB_DECLARE_BOOLEAN_PARAM(FindTeamByServers);
|
||||
|
||||
struct GetTeamRequest {
|
||||
bool wantsNewServers; // In additional to servers in completeSources, try to find teams with new server
|
||||
bool wantsTrueBest;
|
||||
bool preferLowerDiskUtil; // if true, lower utilized team has higher score
|
||||
bool teamMustHaveShards;
|
||||
bool forReadBalance;
|
||||
bool preferLowerReadUtil; // only make sense when forReadBalance is true
|
||||
double inflightPenalty;
|
||||
bool findTeamByServers;
|
||||
std::vector<UID> completeSources;
|
||||
std::vector<UID> src;
|
||||
Promise<std::pair<Optional<Reference<IDataDistributionTeam>>, bool>> reply;
|
||||
|
||||
typedef Reference<IDataDistributionTeam> TeamRef;
|
||||
|
||||
GetTeamRequest() {}
|
||||
GetTeamRequest(WantNewServers wantsNewServers,
|
||||
WantTrueBest wantsTrueBest,
|
||||
PreferLowerDiskUtil preferLowerDiskUtil,
|
||||
TeamMustHaveShards teamMustHaveShards,
|
||||
ForReadBalance forReadBalance = ForReadBalance::False,
|
||||
PreferLowerReadUtil preferLowerReadUtil = PreferLowerReadUtil::False,
|
||||
double inflightPenalty = 1.0)
|
||||
: wantsNewServers(wantsNewServers), wantsTrueBest(wantsTrueBest), preferLowerDiskUtil(preferLowerDiskUtil),
|
||||
teamMustHaveShards(teamMustHaveShards), forReadBalance(forReadBalance),
|
||||
preferLowerReadUtil(preferLowerReadUtil), inflightPenalty(inflightPenalty),
|
||||
findTeamByServers(FindTeamByServers::False) {}
|
||||
GetTeamRequest(std::vector<UID> servers)
|
||||
: wantsNewServers(WantNewServers::False), wantsTrueBest(WantTrueBest::False),
|
||||
preferLowerDiskUtil(PreferLowerDiskUtil::False), teamMustHaveShards(TeamMustHaveShards::False),
|
||||
forReadBalance(ForReadBalance::False), preferLowerReadUtil(PreferLowerReadUtil::False), inflightPenalty(1.0),
|
||||
findTeamByServers(FindTeamByServers::True), src(std::move(servers)) {}
|
||||
|
||||
// return true if a.score < b.score
|
||||
[[nodiscard]] bool lessCompare(TeamRef a, TeamRef b, int64_t aLoadBytes, int64_t bLoadBytes) const {
|
||||
int res = 0;
|
||||
if (forReadBalance) {
|
||||
res = preferLowerReadUtil ? greaterReadLoad(a, b) : lessReadLoad(a, b);
|
||||
}
|
||||
return res == 0 ? lessCompareByLoad(aLoadBytes, bLoadBytes) : res < 0;
|
||||
}
|
||||
|
||||
std::string getDesc() const {
|
||||
std::stringstream ss;
|
||||
|
||||
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
|
||||
<< " PreferLowerDiskUtil:" << preferLowerDiskUtil << " teamMustHaveShards:" << teamMustHaveShards
|
||||
<< "forReadBalance" << forReadBalance << " inflightPenalty:" << inflightPenalty
|
||||
<< " findTeamByServers:" << findTeamByServers << ";";
|
||||
ss << "CompleteSources:";
|
||||
for (const auto& cs : completeSources) {
|
||||
ss << cs.toString() << ",";
|
||||
}
|
||||
|
||||
return std::move(ss).str();
|
||||
}
|
||||
|
||||
private:
|
||||
// return true if preferHigherUtil && aLoadBytes <= bLoadBytes (higher load bytes has larger score)
|
||||
// or preferLowerUtil && aLoadBytes > bLoadBytes
|
||||
bool lessCompareByLoad(int64_t aLoadBytes, int64_t bLoadBytes) const {
|
||||
bool lessLoad = aLoadBytes <= bLoadBytes;
|
||||
return preferLowerDiskUtil ? !lessLoad : lessLoad;
|
||||
}
|
||||
|
||||
// return -1 if a.readload > b.readload
|
||||
static int greaterReadLoad(TeamRef a, TeamRef b) {
|
||||
auto r1 = a->getLoadReadBandwidth(true), r2 = b->getLoadReadBandwidth(true);
|
||||
return r1 == r2 ? 0 : (r1 > r2 ? -1 : 1);
|
||||
}
|
||||
// return -1 if a.readload < b.readload
|
||||
static int lessReadLoad(TeamRef a, TeamRef b) {
|
||||
auto r1 = a->getLoadReadBandwidth(false), r2 = b->getLoadReadBandwidth(false);
|
||||
return r1 == r2 ? 0 : (r1 < r2 ? -1 : 1);
|
||||
}
|
||||
};
|
|
@ -20,8 +20,11 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbrpc/ReplicationTypes.h"
|
||||
#include "fdbserver/DataDistributionTeam.h"
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FastRef.h"
|
||||
|
||||
|
@ -29,6 +32,7 @@ class TCTeamInfo;
|
|||
class TCTenantInfo;
|
||||
class TCMachineInfo;
|
||||
class TCMachineTeamInfo;
|
||||
class DDTeamCollection;
|
||||
|
||||
class TCServerInfo : public ReferenceCounted<TCServerInfo> {
|
||||
friend class TCServerInfoImpl;
|
||||
|
@ -257,8 +261,8 @@ public:
|
|||
TCTenantInfo(TenantInfo tinfo, Key prefix) : m_tenantInfo(tinfo), m_prefix(prefix) {}
|
||||
std::vector<Reference<TCTeamInfo>>& teams() { return m_tenantTeams; }
|
||||
|
||||
TenantName name() { return m_tenantInfo.name.get(); }
|
||||
std::string prefixDesc() { return m_prefix.printable(); }
|
||||
TenantName name() const { return m_tenantInfo.name.get(); }
|
||||
std::string prefixDesc() const { return m_prefix.printable(); }
|
||||
|
||||
void addTeam(TCTeamInfo team);
|
||||
void removeTeam(TCTeamInfo team);
|
||||
|
|
|
@ -18,17 +18,26 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbserver/DDTeamCollection.h"
|
||||
#include "fdbserver/TCInfo.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/IndexedSet.h"
|
||||
#include "flow/flow.h"
|
||||
#include <limits>
|
||||
#include <string>
|
||||
|
||||
typedef Map<KeyRef, Reference<TCTenantInfo>> TenantMapByPrefix;
|
||||
|
||||
struct TenantCacheTenantCreated {
|
||||
KeyRange keys;
|
||||
Promise<bool> reply;
|
||||
TenantCacheTenantCreated(Key prefix) { keys = prefixRange(prefix); }
|
||||
};
|
||||
|
||||
class TenantCache : public ReferenceCounted<TenantCache> {
|
||||
friend class TenantCacheImpl;
|
||||
friend class TenantCacheUnitTest;
|
||||
|
@ -62,11 +71,15 @@ public:
|
|||
generation = deterministicRandom()->randomUInt32();
|
||||
}
|
||||
|
||||
Future<Void> build(Database cx);
|
||||
PromiseStream<TenantCacheTenantCreated> tenantCreationSignal;
|
||||
|
||||
Future<Void> build();
|
||||
|
||||
Future<Void> monitorTenantMap();
|
||||
|
||||
std::string desc() const;
|
||||
|
||||
bool isTenantKey(KeyRef key) const;
|
||||
|
||||
Optional<Reference<TCTenantInfo>> tenantOwning(KeyRef key) const;
|
||||
};
|
||||
|
|
|
@ -140,7 +140,7 @@ struct WorkerDetails {
|
|||
bool degraded;
|
||||
bool recoveredDiskFiles;
|
||||
|
||||
WorkerDetails() : degraded(false) {}
|
||||
WorkerDetails() : degraded(false), recoveredDiskFiles(false) {}
|
||||
WorkerDetails(const WorkerInterface& interf, ProcessClass processClass, bool degraded, bool recoveredDiskFiles)
|
||||
: interf(interf), processClass(processClass), degraded(degraded), recoveredDiskFiles(recoveredDiskFiles) {}
|
||||
|
||||
|
@ -148,7 +148,7 @@ struct WorkerDetails {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, interf, processClass, degraded);
|
||||
serializer(ar, interf, processClass, degraded, recoveredDiskFiles);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -466,6 +466,16 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
state Key middleKey = range.begin.withSuffix("AF"_sr);
|
||||
state Key middleKey2 = range.begin.withSuffix("AG"_sr);
|
||||
|
||||
if (BGRW_DEBUG) {
|
||||
fmt::print("IdempotentUnit: [{0} - {1})\n", range.begin.printable(), range.end.printable());
|
||||
}
|
||||
|
||||
// unblobbifying range that already doesn't exist should be no-op
|
||||
if (deterministicRandom()->coinflip()) {
|
||||
bool unblobbifyStartSuccess = wait(self->setRange(cx, activeRange, false));
|
||||
ASSERT(unblobbifyStartSuccess);
|
||||
}
|
||||
|
||||
bool success = wait(self->setRange(cx, activeRange, true));
|
||||
ASSERT(success);
|
||||
wait(self->checkRange(cx, self, activeRange, true));
|
||||
|
@ -544,8 +554,11 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
bool unblobbifyFail8 = wait(self->setRange(cx, KeyRangeRef(middleKey, middleKey2), false));
|
||||
ASSERT(!unblobbifyFail8);
|
||||
|
||||
bool unblobbifySuccess = wait(self->setRange(cx, activeRange, false));
|
||||
ASSERT(!unblobbifySuccess);
|
||||
bool unblobbifySuccess = wait(self->setRange(cx, activeRange, true));
|
||||
ASSERT(unblobbifySuccess);
|
||||
|
||||
bool unblobbifySuccessAgain = wait(self->setRange(cx, activeRange, true));
|
||||
ASSERT(unblobbifySuccessAgain);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -592,7 +605,6 @@ struct BlobGranuleRangesWorkload : TestWorkload {
|
|||
|
||||
// FIXME: fix bugs and enable these tests!
|
||||
excludedTypes.insert(RANGES_MISALIGNED); // TODO - fix in blob manager
|
||||
excludedTypes.insert(BLOBBIFY_IDEMPOTENT); // fix already in progress in a separate PR
|
||||
excludedTypes.insert(RE_BLOBBIFY); // TODO - fix is non-trivial, is desired behavior eventually
|
||||
|
||||
std::string nextRangeKey = "U_" + self->newKey();
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
#include "fmt/format.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/SignalSafeUnwind.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
// Stress test the slow task profiler or flow profiler
|
||||
|
@ -42,22 +43,24 @@ struct SlowTaskWorkload : TestWorkload {
|
|||
|
||||
ACTOR static Future<Void> go() {
|
||||
wait(delay(1));
|
||||
int64_t phc = dl_iterate_phdr_calls;
|
||||
int64_t startProfilesDeferred = getNumProfilesDeferred();
|
||||
int64_t startProfilesOverflowed = getNumProfilesOverflowed();
|
||||
int64_t startProfilesCaptured = getNumProfilesCaptured();
|
||||
int64_t exc = 0;
|
||||
fprintf(stdout, "Slow task starting\n");
|
||||
fprintf(stderr, "Slow task starting\n");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
fprintf(stdout, " %d\n", i);
|
||||
fprintf(stderr, " %d\n", i);
|
||||
double end = timer() + 1;
|
||||
while (timer() < end) {
|
||||
do_slow_exception_thing(&exc);
|
||||
}
|
||||
}
|
||||
fmt::print(stdout,
|
||||
"Slow task complete: {0} exceptions; {1} profiles deferred, {2} profiles overflowed, {3} profiles "
|
||||
"captured\n",
|
||||
fmt::print(stderr,
|
||||
"Slow task complete: {0} exceptions; {1} calls to dl_iterate_phdr, {2}"
|
||||
" profiles deferred, {3} profiles overflowed, {4} profiles captured\n",
|
||||
exc,
|
||||
dl_iterate_phdr_calls - phc,
|
||||
getNumProfilesDeferred() - startProfilesDeferred,
|
||||
getNumProfilesOverflowed() - startProfilesOverflowed,
|
||||
getNumProfilesCaptured() - startProfilesCaptured);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* ProtocolVersion.h
|
||||
* ApiVersion.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
|
@ -18,15 +18,18 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FLOW_CODE_API_VERSION_H
|
||||
#define FLOW_CODE_API_VERSION_H
|
||||
|
||||
#pragma once
|
||||
#include "flow/Trace.h"
|
||||
#include <cstdint>
|
||||
|
||||
constexpr int noBackwardsCompatibility = 13;
|
||||
|
||||
// The first check second expression version doesn't need to change because it's just for earlier protocol versions.
|
||||
// The first check second expression version doesn't need to change because it's just for earlier API versions.
|
||||
#define API_VERSION_FEATURE(v, x) \
|
||||
static_assert(v <= @FDB_AV_LATEST_VERSION@, "Feature protocol version too large"); \
|
||||
static_assert(v <= @FDB_AV_LATEST_VERSION@, "Feature API version too large"); \
|
||||
struct x { \
|
||||
static constexpr uint64_t apiVersion = v; \
|
||||
}; \
|
||||
|
@ -39,7 +42,6 @@ class ApiVersion {
|
|||
public:
|
||||
// Statics.
|
||||
constexpr static int LATEST_VERSION = @FDB_AV_LATEST_VERSION@;
|
||||
constexpr static int FUTURE_VERSION = @FDB_AV_FUTURE_VERSION@;
|
||||
|
||||
constexpr explicit ApiVersion(int version) : _version(version) {}
|
||||
constexpr ApiVersion() : _version(0) {}
|
||||
|
@ -61,12 +63,15 @@ public:
|
|||
public: // introduced features
|
||||
API_VERSION_FEATURE(@FDB_AV_SNAPSHOT_RYW@, SnapshotRYW);
|
||||
API_VERSION_FEATURE(@FDB_AV_INLINE_UPDATE_DATABASE@, InlineUpdateDatabase);
|
||||
API_VERSION_FEATURE(@FDB_AV_PERSISTENT_OPTIONS@, PersistentOptions);
|
||||
API_VERSION_FEATURE(@FDB_AV_PERSISTENT_OPTIONS@, PersistentOptions);
|
||||
API_VERSION_FEATURE(@FDB_AV_TRACE_FILE_IDENTIFIER@, TraceFileIdentifier);
|
||||
API_VERSION_FEATURE(@FDB_AV_CLUSTER_SHARED_STATE_MAP@, ClusterSharedStateMap);
|
||||
API_VERSION_FEATURE(@FDB_AV_TENANTS_V1@, TenantsV1);
|
||||
API_VERSION_FEATURE(@FDB_AV_TENANTS_V1@, TenantsV1);
|
||||
API_VERSION_FEATURE(@FDB_AV_BLOB_RANGE_API@, BlobRangeApi);
|
||||
API_VERSION_FEATURE(@FDB_AV_CREATE_DB_FROM_CONN_STRING@, CreateDBFromConnString);
|
||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_GET_BOOL@, FutureGetBool);
|
||||
API_VERSION_FEATURE(@FDB_AV_FUTURE_PROTOCOL_VERSION_API@, FutureProtocolVersionApi);
|
||||
API_VERSION_FEATURE(@FDB_AV_TENANTS_V2@, TenantsV2);
|
||||
API_VERSION_FEATURE(@FDB_AV_TENANTS_V2@, TenantsV2);
|
||||
};
|
||||
|
||||
#endif // FLOW_CODE_API_VERSION_H
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
# API Versions
|
||||
set(FDB_AV_LATEST_VERSION "720")
|
||||
set(FDB_AV_FUTURE_VERSION "730")
|
||||
|
||||
# Features
|
||||
set(FDB_AV_SNAPSHOT_RYW "300")
|
||||
set(FDB_AV_INLINE_UPDATE_DATABASE "610")
|
||||
set(FDB_AV_PERSISTENT_OPTIONS "610")
|
||||
set(FDB_AV_TRACE_FILE_IDENTIFIER "630")
|
||||
set(FDB_AV_CLUSTER_SHARED_STATE_MAP "710")
|
||||
set(FDB_AV_TENANTS_V1 "720")
|
||||
set(FDB_AV_BLOB_RANGE_API "720")
|
||||
|
|
|
@ -49,6 +49,7 @@ enum EMkCertOpt : int {
|
|||
OPT_PRINT_SERVER_CERT,
|
||||
OPT_PRINT_CLIENT_CERT,
|
||||
OPT_PRINT_ARGUMENTS,
|
||||
OPT_ENABLE_TRACE,
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption gOptions[] = { { OPT_HELP, "--help", SO_NONE },
|
||||
|
@ -68,6 +69,7 @@ CSimpleOpt::SOption gOptions[] = { { OPT_HELP, "--help", SO_NONE },
|
|||
{ OPT_PRINT_SERVER_CERT, "--print-server-cert", SO_NONE },
|
||||
{ OPT_PRINT_CLIENT_CERT, "--print-client-cert", SO_NONE },
|
||||
{ OPT_PRINT_ARGUMENTS, "--print-args", SO_NONE },
|
||||
{ OPT_ENABLE_TRACE, "--trace", SO_NONE },
|
||||
SO_END_OF_OPTIONS };
|
||||
|
||||
template <size_t Len>
|
||||
|
@ -191,6 +193,7 @@ int main(int argc, char** argv) {
|
|||
auto printServerCert = false;
|
||||
auto printClientCert = false;
|
||||
auto printArguments = false;
|
||||
auto enableTrace = false;
|
||||
auto args = CSimpleOpt(argc, argv, gOptions, SO_O_EXACT | SO_O_HYPHEN_TO_UNDERSCORE);
|
||||
while (args.Next()) {
|
||||
if (auto err = args.LastError()) {
|
||||
|
@ -266,6 +269,8 @@ int main(int argc, char** argv) {
|
|||
case OPT_PRINT_ARGUMENTS:
|
||||
printArguments = true;
|
||||
break;
|
||||
case OPT_ENABLE_TRACE:
|
||||
enableTrace = true;
|
||||
default:
|
||||
fmt::print(stderr, "ERROR: Unknown option {}\n", args.OptionText());
|
||||
return FDB_EXIT_ERROR;
|
||||
|
@ -277,12 +282,14 @@ int main(int argc, char** argv) {
|
|||
platformInit();
|
||||
Error::init();
|
||||
g_network = newNet2(TLSConfig());
|
||||
openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, ".", "mkcert");
|
||||
if (enableTrace)
|
||||
openTraceFile(NetworkAddress(), 10 << 20, 10 << 20, ".", "mkcert");
|
||||
auto thread = std::thread([]() { g_network->run(); });
|
||||
auto cleanUpGuard = ScopeExit([&thread]() {
|
||||
auto cleanUpGuard = ScopeExit([&thread, enableTrace]() {
|
||||
g_network->stop();
|
||||
thread.join();
|
||||
closeTraceFile();
|
||||
if (enableTrace)
|
||||
closeTraceFile();
|
||||
});
|
||||
|
||||
serverArgs.transformPathToAbs();
|
||||
|
|
|
@ -90,6 +90,10 @@ void initProfiling() {
|
|||
net2backtraces = new volatile void*[net2backtraces_max];
|
||||
other_backtraces = new volatile void*[net2backtraces_max];
|
||||
|
||||
// According to folk wisdom, calling this once before setting up the signal handler makes
|
||||
// it async signal safe in practice :-/
|
||||
backtrace(const_cast<void**>(other_backtraces), net2backtraces_max);
|
||||
|
||||
sigemptyset(&sigprof_set);
|
||||
sigaddset(&sigprof_set, SIGPROF);
|
||||
}
|
||||
|
|
|
@ -3765,7 +3765,7 @@ void profileHandler(int sig) {
|
|||
ps->timestamp = checkThreadTime.is_lock_free() ? checkThreadTime.load() : 0;
|
||||
|
||||
// SOMEDAY: should we limit the maximum number of frames from backtrace beyond just available space?
|
||||
size_t size = platform::raw_backtrace(ps->frames, net2backtraces_max - net2backtraces_offset - 2);
|
||||
size_t size = backtrace(ps->frames, net2backtraces_max - net2backtraces_offset - 2);
|
||||
|
||||
ps->length = size;
|
||||
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* SignalSafeUnwind.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "flow/SignalSafeUnwind.h"
|
||||
|
||||
int64_t dl_iterate_phdr_calls = 0;
|
||||
|
||||
#if defined(__linux__) && !defined(USE_SANITIZER)
|
||||
|
||||
#include <link.h>
|
||||
#include <mutex>
|
||||
|
||||
static int (*chain_dl_iterate_phdr)(int (*callback)(struct dl_phdr_info* info, size_t size, void* data),
|
||||
void* data) = nullptr;
|
||||
|
||||
static void initChain() {
|
||||
static std::once_flag flag;
|
||||
|
||||
// Ensure that chain_dl_iterate_phdr points to the "real" function that we are overriding
|
||||
std::call_once(flag, []() { *(void**)&chain_dl_iterate_phdr = dlsym(RTLD_NEXT, "dl_iterate_phdr"); });
|
||||
|
||||
if (!chain_dl_iterate_phdr) {
|
||||
criticalError(FDB_EXIT_ERROR, "SignalSafeUnwindError", "Unable to find dl_iterate_phdr symbol");
|
||||
}
|
||||
}
|
||||
|
||||
// This overrides the function in libc!
|
||||
extern "C" int dl_iterate_phdr(int (*callback)(struct dl_phdr_info* info, size_t size, void* data), void* data) {
|
||||
interlockedIncrement64(&dl_iterate_phdr_calls);
|
||||
|
||||
initChain();
|
||||
|
||||
setProfilingEnabled(0);
|
||||
int result = chain_dl_iterate_phdr(callback, data);
|
||||
setProfilingEnabled(1);
|
||||
return result;
|
||||
}
|
||||
#endif
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* SignalSafeUnwind.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FLOW_SIGNAL_SAFE_UNWIND
|
||||
#define FLOW_SIGNAL_SAFE_UNWIND
|
||||
#pragma once
|
||||
|
||||
#include "flow/Platform.h"
|
||||
|
||||
// This can be used by tests to measure the number of calls to dl_iterate_phdr intercepted
|
||||
extern int64_t dl_iterate_phdr_calls;
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue