diff --git a/CMakeLists.txt b/CMakeLists.txt index a23b5c304b..710e049bec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -157,6 +157,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") endif() include(CompileBoost) +include(GetMsgpack) add_subdirectory(flow) add_subdirectory(fdbrpc) add_subdirectory(fdbclient) diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index a03f192808..ac21347e3f 100644 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -189,10 +189,19 @@ int cleanup(FDBTransaction* transaction, mako_args_t* args) { free(prefixstr); len += 1; +retryTxn: clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_start); + fdb_transaction_clear_range(transaction, (uint8_t*)beginstr, len + 1, (uint8_t*)endstr, len + 1); - if (commit_transaction(transaction) != FDB_SUCCESS) - goto failExit; + switch (commit_transaction(transaction)) { + case (FDB_SUCCESS): + break; + case (FDB_ERROR_RETRY): + fdb_transaction_reset(transaction); + goto retryTxn; + default: + goto failExit; + } fdb_transaction_reset(transaction); clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_end); @@ -308,11 +317,19 @@ int populate(FDBTransaction* transaction, /* commit every 100 inserts (default) */ if (i % args->txnspec.ops[OP_INSERT][OP_COUNT] == 0) { + retryTxn: if (stats->xacts % args->sampling == 0) { clock_gettime(CLOCK_MONOTONIC, &timer_start_commit); } - if (commit_transaction(transaction) != FDB_SUCCESS) - goto failExit; + + switch (commit_transaction(transaction)) { + case (FDB_SUCCESS): + break; + case (FDB_ERROR_RETRY): + goto retryTxn; + default: + goto failExit; + } /* xact latency stats */ if (stats->xacts % args->sampling == 0) { @@ -337,20 +354,36 @@ int populate(FDBTransaction* transaction, xacts++; /* for throttling */ } } - - if (stats->xacts % args->sampling == 0) { - clock_gettime(CLOCK_MONOTONIC, &timer_start_commit); - } - if (commit_transaction(transaction) != FDB_SUCCESS) - goto failExit; - - /* xact latency stats */ - if (stats->xacts % args->sampling == 0) { - clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end); - update_op_lat_stats( - &timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated); - update_op_lat_stats( - &timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated); + time_t start_time_sec, current_time_sec; + time(&start_time_sec); + int is_committed = false; + // will hit FDB_ERROR_RETRY if running mako with multi-version client + while (!is_committed) { + if (stats->xacts % args->sampling == 0) { + clock_gettime(CLOCK_MONOTONIC, &timer_start_commit); + } + int rc; + if ((rc = commit_transaction(transaction) != FDB_SUCCESS)) { + if (rc == FDB_ERROR_RETRY) { + time(¤t_time_sec); + if (difftime(current_time_sec, start_time_sec) > 5) { + goto failExit; + } else { + continue; + } + } else { + goto failExit; + } + } + is_committed = true; + /* xact latency stats */ + if (stats->xacts % args->sampling == 0) { + clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end); + update_op_lat_stats( + &timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated); + update_op_lat_stats( + &timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated); + } } clock_gettime(CLOCK_MONOTONIC, &timer_end); diff --git a/bindings/python/tests/fdbcli_tests.py b/bindings/python/tests/fdbcli_tests.py index 14224a6ca8..17a94e16b1 100755 --- a/bindings/python/tests/fdbcli_tests.py +++ b/bindings/python/tests/fdbcli_tests.py @@ -332,9 +332,10 @@ def transaction(logger): output7 = run_fdbcli_command('get', 'key') assert output7 == "`key': not found" -def get_fdb_process_addresses(): +def get_fdb_process_addresses(logger): # get all processes' network addresses output = run_fdbcli_command('kill') + logger.debug(output) # except the first line, each line is one process addresses = output.split('\n')[1:] assert len(addresses) == process_number @@ -354,7 +355,7 @@ def coordinators(logger): assert coordinator_list[0]['address'] == coordinators # verify the cluster description assert get_value_from_status_json(True, 'cluster', 'connection_string').startswith('{}:'.format(cluster_description)) - addresses = get_fdb_process_addresses() + addresses = get_fdb_process_addresses(logger) # set all 5 processes as coordinators and update the cluster description new_cluster_description = 'a_simple_description' run_fdbcli_command('coordinators', *addresses, 'description={}'.format(new_cluster_description)) @@ -369,7 +370,7 @@ def coordinators(logger): @enable_logging() def exclude(logger): # get all processes' network addresses - addresses = get_fdb_process_addresses() + addresses = get_fdb_process_addresses(logger) logger.debug("Cluster processes: {}".format(' '.join(addresses))) # There should be no excluded process for now no_excluded_process_output = 'There are currently no servers or localities excluded from the database.' @@ -377,16 +378,28 @@ def exclude(logger): assert no_excluded_process_output in output1 # randomly pick one and exclude the process excluded_address = random.choice(addresses) + # If we see "not enough space" error, use FORCE option to proceed + # this should be a safe operation as we do not need any storage space for the test + force = False # sometimes we need to retry the exclude while True: logger.debug("Excluding process: {}".format(excluded_address)) - error_message = run_fdbcli_command_and_get_error('exclude', excluded_address) + if force: + error_message = run_fdbcli_command_and_get_error('exclude', 'FORCE', excluded_address) + else: + error_message = run_fdbcli_command_and_get_error('exclude', excluded_address) if error_message == 'WARNING: {} is a coordinator!'.format(excluded_address): # exclude coordinator will print the warning, verify the randomly selected process is the coordinator coordinator_list = get_value_from_status_json(True, 'client', 'coordinators', 'coordinators') assert len(coordinator_list) == 1 assert coordinator_list[0]['address'] == excluded_address break + elif 'ERROR: This exclude may cause the total free space in the cluster to drop below 10%.' in error_message: + # exclude the process may cause the free space not enough + # use FORCE option to ignore it and proceed + assert not force + force = True + logger.debug("Use FORCE option to exclude the process") elif not error_message: break else: @@ -450,10 +463,7 @@ if __name__ == '__main__': throttle() else: assert process_number > 1, "Process number should be positive" - # the kill command which used to list processes seems to not work as expected sometime - # which makes the test flaky. - # We need to figure out the reason and then re-enable these tests - #coordinators() - #exclude() + coordinators() + exclude() diff --git a/cmake/GetMsgpack.cmake b/cmake/GetMsgpack.cmake new file mode 100644 index 0000000000..b3313f336e --- /dev/null +++ b/cmake/GetMsgpack.cmake @@ -0,0 +1,20 @@ +find_package(msgpack 3.3.0 EXACT QUIET CONFIG) + +add_library(msgpack INTERFACE) + +if(msgpack_FOUND) + target_link_libraries(msgpack INTERFACE msgpackc-cxx) +else() + include(ExternalProject) + ExternalProject_add(msgpackProject + URL "https://github.com/msgpack/msgpack-c/releases/download/cpp-3.3.0/msgpack-3.3.0.tar.gz" + URL_HASH SHA256=6e114d12a5ddb8cb11f669f83f32246e484a8addd0ce93f274996f1941c1f07b + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + ) + + ExternalProject_Get_property(msgpackProject SOURCE_DIR) + target_include_directories(msgpack SYSTEM INTERFACE "${SOURCE_DIR}/include") + add_dependencies(msgpack msgpackProject) +endif() diff --git a/fdbbackup/FileDecoder.actor.cpp b/fdbbackup/FileDecoder.actor.cpp index 19dbaf4f80..aa61dddeff 100644 --- a/fdbbackup/FileDecoder.actor.cpp +++ b/fdbbackup/FileDecoder.actor.cpp @@ -506,7 +506,7 @@ ACTOR Future decode_logs(DecodeParams params) { wait(process_file(container, logs[idx], uid, params)); idx++; } - TraceEvent("DecodeDone", uid); + TraceEvent("DecodeDone", uid).log(); return Void(); } diff --git a/fdbclient/ActorLineageProfiler.cpp b/fdbclient/ActorLineageProfiler.cpp new file mode 100644 index 0000000000..00cd51ffb7 --- /dev/null +++ b/fdbclient/ActorLineageProfiler.cpp @@ -0,0 +1,394 @@ +/* + * ActorLineageProfiler.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/flow.h" +#include "flow/singleton.h" +#include "fdbrpc/IAsyncFile.h" +#include "fdbclient/ActorLineageProfiler.h" +#include "fdbclient/NameLineage.h" +#include +#include +#include +#include +#include + +using namespace std::literals; + +class Packer : public msgpack::packer { + struct visitor_t { + using VisitorMap = std::unordered_map>; + VisitorMap visitorMap; + + template + static void any_visitor(std::any const& val, Packer& packer) { + const T& v = std::any_cast(val); + packer.pack(v); + } + + template + struct populate_visitor_map; + template + struct populate_visitor_map { + static void populate(VisitorMap& map) { + map.emplace(std::type_index(typeid(Head)), any_visitor); + populate_visitor_map::populate(map); + } + }; + template + struct populate_visitor_map { + static void populate(VisitorMap&) {} + }; + + visitor_t() { + populate_visitor_map, + std::vector, + std::vector, + std::map, + std::map, + std::vector>>::populate(visitorMap); + } + + void visit(const std::any& val, Packer& packer) { + auto iter = visitorMap.find(val.type()); + if (iter == visitorMap.end()) { + TraceEvent(SevError, "PackerTypeNotFound").detail("Type", val.type().name()); + } else { + iter->second(val, packer); + } + } + }; + msgpack::sbuffer sbuffer; + // Initializing visitor_t involves building a type-map. As this is a relatively expensive operation, we don't want + // to do this each time we create a Packer object. So visitor_t is a stateless class and we only use it as a + // visitor. + crossbow::singleton visitor; + +public: + Packer() : msgpack::packer(sbuffer) {} + + void pack(std::any const& val) { visitor->visit(val, *this); } + + void pack(bool val) { + if (val) { + pack_true(); + } else { + pack_false(); + } + } + + void pack(uint64_t val) { + if (val <= std::numeric_limits::max()) { + pack_uint8(uint8_t(val)); + } else if (val <= std::numeric_limits::max()) { + pack_uint16(uint16_t(val)); + } else if (val <= std::numeric_limits::max()) { + pack_uint32(uint32_t(val)); + } else { + pack_uint64(val); + } + } + + void pack(int64_t val) { + if (val >= 0) { + this->pack(uint64_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int8(int8_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int16(int16_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int32(int32_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int64(int64_t(val)); + } + } + + void pack(float val) { pack_float(val); } + void pack(double val) { pack_double(val); } + void pack(std::string const& str) { + pack_str(str.size()); + pack_str_body(str.data(), str.size()); + } + + void pack(std::string_view val) { + pack_str(val.size()); + pack_str_body(val.data(), val.size()); + } + + template + void pack(std::map const& map) { + pack_map(map.size()); + for (const auto& p : map) { + pack(p.first); + pack(p.second); + } + } + + template + void pack(std::vector const& val) { + pack_array(val.size()); + for (const auto& v : val) { + pack(v); + } + } + + std::pair getbuf() { + unsigned size = sbuffer.size(); + return std::make_pair(sbuffer.release(), size); + } +}; + +IALPCollectorBase::IALPCollectorBase() { + SampleCollector::instance().addCollector(this); +} + +std::map SampleCollectorT::collect(ActorLineage* lineage) { + ASSERT(lineage != nullptr); + std::map out; + for (auto& collector : collectors) { + auto val = collector->collect(lineage); + if (val.has_value()) { + out[collector->name()] = val.value(); + } + } + return out; +} + +std::shared_ptr SampleCollectorT::collect() { + auto sample = std::make_shared(); + double time = g_network->now(); + sample->time = time; + for (auto& p : getSamples) { + Packer packer; + std::vector> samples; + auto sampleVec = p.second(); + for (auto& val : sampleVec) { + auto m = collect(val.getPtr()); + if (!m.empty()) { + samples.emplace_back(std::move(m)); + } + } + if (!samples.empty()) { + packer.pack(samples); + sample->data[p.first] = packer.getbuf(); + } + } + return sample; +} + +void SampleCollection_t::collect(const Reference& lineage) { + ASSERT(lineage.isValid()); + _currentLineage = lineage; + auto sample = _collector->collect(); + ASSERT(sample); + { + Lock _{ mutex }; + data.emplace_back(sample); + } + auto min = std::min(data.back()->time - windowSize, data.back()->time); + double oldest = data.front()->time; + // we don't need to check for data.empty() in this loop (or the inner loop) as we know that we will end + // up with at least one entry which is the most recent sample + while (oldest < min) { + Lock _{ mutex }; + // we remove at most 10 elements at a time. This is so we don't block the main thread for too long. + for (int i = 0; i < 10 && oldest < min; ++i) { + data.pop_front(); + oldest = data.front()->time; + } + } + // TODO: Should only call ingest when deleting from memory + config->ingest(sample); +} + +std::vector> SampleCollection_t::get(double from /*= 0.0*/, + double to /*= std::numeric_limits::max()*/) const { + Lock _{ mutex }; + std::vector> res; + for (const auto& sample : data) { + if (sample->time > to) { + break; + } else if (sample->time >= from) { + res.push_back(sample); + } + } + return res; +} + +void sample(LineageReference* lineagePtr) { + if (!lineagePtr->isValid()) { return; } + (*lineagePtr)->modify(&NameLineage::actorName) = lineagePtr->actorName(); + boost::asio::post(ActorLineageProfiler::instance().context(), [lineage = LineageReference::addRef(lineagePtr->getPtr())]() { + SampleCollection::instance().collect(lineage); + }); +} + +struct ProfilerImpl { + boost::asio::io_context context; + boost::asio::executor_work_guard workGuard; + boost::asio::steady_timer timer; + std::thread mainThread; + unsigned frequency; + + SampleCollection collection; + + ProfilerImpl() : workGuard(context.get_executor()), timer(context) { + mainThread = std::thread([this]() { context.run(); }); + } + ~ProfilerImpl() { + setFrequency(0); + workGuard.reset(); + mainThread.join(); + } + + void profileHandler(boost::system::error_code const& ec) { + if (ec) { + return; + } + startSampling = true; + timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency)); + timer.async_wait([this](auto const& ec) { profileHandler(ec); }); + } + + void setFrequency(unsigned frequency) { + boost::asio::post(context, [this, frequency]() { + this->frequency = frequency; + timer.cancel(); + if (frequency > 0) { + profileHandler(boost::system::error_code{}); + } + }); + } +}; + +ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) { + // collection->collector()->addGetter(WaitState::Network, + // std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet()))); + // collection->collector()->addGetter( + // WaitState::Disk, + // std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet()))); + collection->collector()->addGetter(WaitState::Running, []() { + return std::vector>({ SampleCollection::instance().getLineage() }); + }); +} + +ActorLineageProfilerT::~ActorLineageProfilerT() { + delete impl; +} + +void ActorLineageProfilerT::setFrequency(unsigned frequency) { + impl->setFrequency(frequency); +} + +boost::asio::io_context& ActorLineageProfilerT::context() { + return impl->context; +} + +SampleIngestor::~SampleIngestor() {} + +void ProfilerConfigT::reset(std::map const& config) { + bool expectNoMore = false, useFluentD = false, useTCP = false; + std::string endpoint; + ConfigError err; + for (auto& kv : config) { + if (expectNoMore) { + err.description = format("Unexpected option %s", kv.first.c_str()); + throw err; + } + if (kv.first == "ingestor") { + std::string val = kv.second; + std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); }); + if (val == "none") { + setBackend(std::make_shared()); + } else if (val == "fluentd") { + useFluentD = true; + } else { + err.description = format("Unsupported ingestor: %s", val.c_str()); + throw err; + } + } else if (kv.first == "ingestor_endpoint") { + endpoint = kv.second; + } else if (kv.first == "ingestor_protocol") { + auto val = kv.second; + std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); }); + if (val == "tcp") { + useTCP = true; + } else if (val == "udp") { + useTCP = false; + } else { + err.description = format("Unsupported protocol for fluentd: %s", kv.second.c_str()); + throw err; + } + } else { + err.description = format("Unknown option %s", kv.first.c_str()); + throw err; + } + } + if (useFluentD) { + if (endpoint.empty()) { + err.description = "Endpoint is required for fluentd ingestor"; + throw err; + } + NetworkAddress address; + try { + address = NetworkAddress::parse(endpoint); + } catch (Error& e) { + err.description = format("Can't parse address %s", endpoint.c_str()); + throw err; + } + setBackend(std::make_shared( + useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::UDP, address)); + } +} + +std::map ProfilerConfigT::getConfig() const { + std::map res; + if (ingestor) { + ingestor->getConfig(res); + } + return res; +} + +// Callback used to update the sampling profilers run frequency whenever the +// frequency changes. +void samplingProfilerUpdateFrequency(std::optional freq) { + double frequency = 0; + if (freq.has_value()) { + frequency = std::any_cast(freq.value()); + } + TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency); + ActorLineageProfiler::instance().setFrequency(frequency); +} + +// Callback used to update the sample collector window size. +void samplingProfilerUpdateWindow(std::optional window) { + double duration = 0; + if (window.has_value()) { + duration = std::any_cast(window.value()); + } + TraceEvent(SevInfo, "SamplingProfilerUpdateWindow").detail("Duration", duration); + SampleCollection::instance().setWindowSize(duration); +} diff --git a/fdbclient/ActorLineageProfiler.h b/fdbclient/ActorLineageProfiler.h new file mode 100644 index 0000000000..a55d1541e1 --- /dev/null +++ b/fdbclient/ActorLineageProfiler.h @@ -0,0 +1,192 @@ +/* + * ActorLineageProfiler.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbclient/AnnotateActor.h" + +#include +#include +#include +#include +#include +#include +#include "flow/singleton.h" +#include "flow/flow.h" + +void samplingProfilerUpdateFrequency(std::optional freq); +void samplingProfilerUpdateWindow(std::optional window); + +struct IALPCollectorBase { + virtual std::optional collect(ActorLineage*) = 0; + virtual const std::string_view& name() = 0; + IALPCollectorBase(); +}; + +template +struct IALPCollector : IALPCollectorBase { + const std::string_view& name() override { return T::name; } +}; + +struct Sample : std::enable_shared_from_this { + double time = 0.0; + Sample() {} + Sample(Sample const&) = delete; + Sample& operator=(Sample const&) = delete; + std::unordered_map> data; + ~Sample() { + std::for_each(data.begin(), data.end(), [](std::pair> entry) { + ::free(entry.second.first); + }); + } +}; + +class SampleIngestor : std::enable_shared_from_this { +public: + virtual ~SampleIngestor(); + virtual void ingest(std::shared_ptr const& sample) = 0; + virtual void getConfig(std::map&) const = 0; +}; + +class NoneIngestor : public SampleIngestor { +public: + void ingest(std::shared_ptr const& sample) override {} + void getConfig(std::map& res) const override { res["ingestor"] = "none"; } +}; + +// The FluentD ingestor uses the pimpl idiom. This is to make compilation less heavy weight as this implementation has +// dependencies to boost::asio +struct FluentDIngestorImpl; + +class FluentDIngestor : public SampleIngestor { +public: // Public Types + enum class Protocol { TCP, UDP }; + +private: // members + FluentDIngestorImpl* impl; + +public: // interface + void ingest(std::shared_ptr const& sample) override; + FluentDIngestor(Protocol protocol, NetworkAddress& endpoint); + void getConfig(std::map& res) const override; + ~FluentDIngestor(); +}; + +struct ConfigError { + std::string description; +}; + +class ProfilerConfigT { +private: // private types + using Lock = std::unique_lock; + friend class crossbow::create_static; + +private: // members + std::shared_ptr ingestor = std::make_shared(); + +private: // construction + ProfilerConfigT() {} + ProfilerConfigT(ProfilerConfigT const&) = delete; + ProfilerConfigT& operator=(ProfilerConfigT const&) = delete; + void setBackend(std::shared_ptr ingestor) { this->ingestor = ingestor; } + +public: + void ingest(std::shared_ptr sample) { ingestor->ingest(sample); } + void reset(std::map const& config); + std::map getConfig() const; +}; + +using ProfilerConfig = crossbow::singleton; + +class SampleCollectorT { +public: // Types + friend struct crossbow::create_static; + using Getter = std::function>()>; + +private: + std::vector collectors; + std::map getSamples; + SampleCollectorT() {} + std::map collect(ActorLineage* lineage); + +public: + void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); } + std::shared_ptr collect(); + void addGetter(WaitState waitState, Getter const& getter) { getSamples[waitState] = getter; }; +}; + +using SampleCollector = crossbow::singleton; + +class SampleCollection_t { + friend struct crossbow::create_static; + using Lock = std::unique_lock; + SampleCollection_t() {} + + SampleCollector _collector; + mutable std::mutex mutex; + std::atomic windowSize = 0.0; + std::deque> data; + ProfilerConfig config; + Reference _currentLineage; + +public: + /** + * Define how many samples the collection shoul keep. The window size is defined by time dimension. + * + * \param duration How long a sample should be kept in the collection. + */ + void setWindowSize(double duration) { windowSize.store(duration); } + /** + * By default returns reference counted pointers of all samples. A window can be defined in terms of absolute time. + * + * \param from The minimal age of all returned samples. + * \param to The max age of all returned samples. + */ + std::vector> get(double from = 0.0, double to = std::numeric_limits::max()) const; + void collect(const Reference& lineage); + const SampleCollector& collector() const { return _collector; } + SampleCollector& collector() { return _collector; } + Reference getLineage() { return _currentLineage; } +}; + +using SampleCollection = crossbow::singleton; + +struct ProfilerImpl; + +namespace boost { +namespace asio { +// forward declare io_context because including boost asio is super expensive +class io_context; +} // namespace asio +} // namespace boost + +class ActorLineageProfilerT { + friend struct crossbow::create_static; + ProfilerImpl* impl; + SampleCollection collection; + ActorLineageProfilerT(); + +public: + ~ActorLineageProfilerT(); + void setFrequency(unsigned frequency); + boost::asio::io_context& context(); +}; + +using ActorLineageProfiler = crossbow::singleton; diff --git a/fdbclient/AnnotateActor.cpp b/fdbclient/AnnotateActor.cpp new file mode 100644 index 0000000000..80b9a8cec4 --- /dev/null +++ b/fdbclient/AnnotateActor.cpp @@ -0,0 +1,23 @@ +/* + * AnnotateActor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/AnnotateActor.h" + +std::map>()>> samples; diff --git a/fdbclient/AnnotateActor.h b/fdbclient/AnnotateActor.h new file mode 100644 index 0000000000..757410b18a --- /dev/null +++ b/fdbclient/AnnotateActor.h @@ -0,0 +1,91 @@ +/* + * AnnotateActor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "flow/flow.h" +#include "flow/network.h" + +#include + +// Used to manually instrument waiting actors to collect samples for the +// sampling profiler. +struct AnnotateActor { + unsigned index; + bool set; + + AnnotateActor() : set(false) {} + + AnnotateActor(LineageReference* lineage) : set(false) { +#ifdef ENABLE_SAMPLING + if (lineage->getPtr() != 0) { + index = g_network->getActorLineageSet().insert(*lineage); + set = (index != ActorLineageSet::npos); + } +#endif + } + + AnnotateActor(const AnnotateActor& other) = delete; + AnnotateActor(AnnotateActor&& other) = delete; + AnnotateActor& operator=(const AnnotateActor& other) = delete; + + AnnotateActor& operator=(AnnotateActor&& other) { + if (this == &other) { + return *this; + } + + this->index = other.index; + this->set = other.set; + + other.set = false; + + return *this; + } + + ~AnnotateActor() { +#ifdef ENABLE_SAMPLING + if (set) { + g_network->getActorLineageSet().erase(index); + } +#endif + } +}; + +enum class WaitState { Disk, Network, Running }; +// usually we shouldn't use `using namespace` in a header file, but literals should be safe as user defined literals +// need to be prefixed with `_` +using namespace std::literals; + +constexpr std::string_view to_string(WaitState st) { + switch (st) { + case WaitState::Disk: + return "Disk"sv; + case WaitState::Network: + return "Network"sv; + case WaitState::Running: + return "Running"sv; + default: + return ""sv; + } +} + +#ifdef ENABLE_SAMPLING +extern std::map>()>> samples; +#endif diff --git a/fdbclient/BackupAgentBase.actor.cpp b/fdbclient/BackupAgentBase.actor.cpp index 774f606344..484daa0984 100644 --- a/fdbclient/BackupAgentBase.actor.cpp +++ b/fdbclient/BackupAgentBase.actor.cpp @@ -330,7 +330,7 @@ void decodeBackupLogValue(Arena& arena, } } else { Version ver = key_version->rangeContaining(logValue.param1).value(); - //TraceEvent("ApplyMutation").detail("LogValue", logValue.toString()).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion); + //TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion); if (version > ver && ver != invalidVersion) { if (removePrefix.size()) { logValue.param1 = logValue.param1.removePrefix(removePrefix); diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index 7f51be87ba..56acffd53c 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -1,4 +1,7 @@ set(FDBCLIENT_SRCS + ActorLineageProfiler.h + ActorLineageProfiler.cpp + AnnotateActor.cpp AsyncFileS3BlobStore.actor.cpp AsyncFileS3BlobStore.actor.h AsyncTaskThread.actor.cpp @@ -39,6 +42,7 @@ set(FDBCLIENT_SRCS FDBOptions.h FDBTypes.cpp FDBTypes.h + FluentDSampleIngestor.cpp FileBackupAgent.actor.cpp GlobalConfig.h GlobalConfig.actor.h @@ -66,6 +70,8 @@ set(FDBCLIENT_SRCS MultiVersionTransaction.actor.cpp MultiVersionTransaction.h MutationList.h + NameLineage.h + NameLineage.cpp NativeAPI.actor.cpp NativeAPI.actor.h Notified.h @@ -101,6 +107,8 @@ set(FDBCLIENT_SRCS StorageServerInterface.h Subspace.cpp Subspace.h + StackLineage.h + StackLineage.cpp SystemData.cpp SystemData.h TagThrottle.actor.cpp @@ -172,8 +180,17 @@ endif() add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs}) add_dependencies(fdbclient fdboptions fdb_c_options) +target_link_libraries(fdbclient PUBLIC fdbrpc msgpack) + +# Create a separate fdbclient library with sampling enabled. This lets +# fdbserver retain sampling functionality in client code while disabling +# sampling for pure clients. +add_flow_target(STATIC_LIBRARY NAME fdbclient_sampling SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs}) +add_dependencies(fdbclient_sampling fdboptions fdb_c_options) +target_link_libraries(fdbclient_sampling PUBLIC fdbrpc_sampling msgpack) +target_compile_definitions(fdbclient_sampling PRIVATE -DENABLE_SAMPLING) + if(BUILD_AZURE_BACKUP) - target_link_libraries(fdbclient PUBLIC fdbrpc PRIVATE curl uuid azure-storage-lite) -else() - target_link_libraries(fdbclient PUBLIC fdbrpc) + target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite) + target_link_libraries(fdbclient_sampling PRIVATE curl uuid azure-storage-lite) endif() diff --git a/fdbclient/ClientLogEvents.h b/fdbclient/ClientLogEvents.h index 5c4f42dfd6..e2b7bb55f3 100644 --- a/fdbclient/ClientLogEvents.h +++ b/fdbclient/ClientLogEvents.h @@ -22,6 +22,9 @@ #ifndef FDBCLIENT_CLIENTLOGEVENTS_H #define FDBCLIENT_CLIENTLOGEVENTS_H +#include "fdbclient/FDBTypes.h" +#include "fdbclient/CommitProxyInterface.h" + namespace FdbClientLogEvents { enum class EventType { GET_VERSION_LATENCY = 0, @@ -252,7 +255,7 @@ struct EventCommit : public Event { .setMaxEventLength(-1) .detail("TransactionID", id) .setMaxFieldLength(maxFieldLength) - .detail("Mutation", mutation.toString()); + .detail("Mutation", mutation); } TraceEvent("TransactionTrace_Commit") @@ -316,7 +319,7 @@ struct EventCommit_V2 : public Event { .setMaxEventLength(-1) .detail("TransactionID", id) .setMaxFieldLength(maxFieldLength) - .detail("Mutation", mutation.toString()); + .detail("Mutation", mutation); } TraceEvent("TransactionTrace_Commit") @@ -430,7 +433,7 @@ struct EventCommitError : public Event { .setMaxEventLength(-1) .detail("TransactionID", id) .setMaxFieldLength(maxFieldLength) - .detail("Mutation", mutation.toString()); + .detail("Mutation", mutation); } TraceEvent("TransactionTrace_CommitError").detail("TransactionID", id).detail("ErrCode", errCode); diff --git a/fdbclient/DatabaseBackupAgent.actor.cpp b/fdbclient/DatabaseBackupAgent.actor.cpp index 8ddc6d6be0..af47ddcea0 100644 --- a/fdbclient/DatabaseBackupAgent.actor.cpp +++ b/fdbclient/DatabaseBackupAgent.actor.cpp @@ -2355,7 +2355,7 @@ std::string getDRMutationStreamId(StatusObjectReader statusObj, const char* cont } } } - TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName.toString()).detail("Context", context); + TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName).detail("Context", context); throw backup_error(); } catch (std::runtime_error& e) { TraceEvent(SevWarn, "DBA_GetDRMutationStreamIdFail").detail("Error", e.what()); diff --git a/fdbclient/FluentDSampleIngestor.cpp b/fdbclient/FluentDSampleIngestor.cpp new file mode 100644 index 0000000000..b881ffbafe --- /dev/null +++ b/fdbclient/FluentDSampleIngestor.cpp @@ -0,0 +1,265 @@ +/* + * FluentDSampleIngestor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/ActorLineageProfiler.h" +#include +#include +#include + +namespace { + +boost::asio::ip::address ipAddress(IPAddress const& n) { + if (n.isV6()) { + return boost::asio::ip::address_v6(n.toV6()); + } else { + return boost::asio::ip::address_v4(n.toV4()); + } +} + +template +boost::asio::ip::basic_endpoint toEndpoint(NetworkAddress const n) { + return boost::asio::ip::basic_endpoint(ipAddress(n.ip), n.port); +} + +struct FluentDSocket { + virtual ~FluentDSocket() {} + virtual void connect(NetworkAddress const& endpoint) = 0; + virtual void send(std::shared_ptr const& sample) = 0; + virtual const boost::system::error_code& failed() const = 0; +}; + +template +class SampleSender : public std::enable_shared_from_this> { + using Socket = typename Protocol::socket; + using Iter = typename decltype(Sample::data)::iterator; + Socket& socket; + Callback callback; + Iter iter, end; + std::shared_ptr sample_; // to keep from being deallocated + + struct Buf { + const char* data; + const unsigned size; + Buf(const char* data, unsigned size) : data(data), size(size) {} + Buf(Buf const&) = delete; + Buf& operator=(Buf const&) = delete; + ~Buf() { delete[] data; } + }; + + void sendCompletionHandler(boost::system::error_code const& ec) { + if (ec) { + callback(ec); + } else { + ++iter; + sendNext(); + } + } + + void send(boost::asio::ip::tcp::socket& socket, std::shared_ptr const& buf) { + boost::asio::async_write( + socket, + boost::asio::const_buffer(buf->data, buf->size), + [buf, this](auto const& ec, size_t) { + this->sendCompletionHandler(ec); + }); + } + void send(boost::asio::ip::udp::socket& socket, std::shared_ptr const& buf) { + socket.async_send( + boost::asio::const_buffer(buf->data, buf->size), + [buf, this](auto const& ec, size_t) { this->sendCompletionHandler(ec); }); + } + + void sendNext() { + if (iter == end) { + callback(boost::system::error_code()); + return; + } + // 1. calculate size of buffer + unsigned size = 1; // 1 for fixmap identifier byte + auto waitState = to_string(iter->first); + if (waitState.size() < 32) { + size += waitState.size() + 1; + } else { + size += waitState.size() + 2; + } + size += iter->second.second; + // 2. allocate the buffer + std::unique_ptr buf(new char[size]); + unsigned off = 0; + // 3. serialize fixmap + buf[off++] = 0x81; // map of size 1 + // 3.1 serialize key + if (waitState.size() < 32) { + buf[off++] = 0xa0 + waitState.size(); // fixstr + } else { + buf[off++] = 0xd9; + buf[off++] = char(waitState.size()); + } + memcpy(buf.get() + off, waitState.data(), waitState.size()); + off += waitState.size(); + // 3.2 append serialized value + memcpy(buf.get() + off, iter->second.first, iter->second.second); + // 4. send the result to fluentd + send(socket, std::make_shared(buf.release(), size)); + } + +public: + SampleSender(Socket& socket, Callback const& callback, std::shared_ptr const& sample) + : socket(socket), + callback(callback), + iter(sample->data.begin()), + end(sample->data.end()), + sample_(sample) { + sendNext(); + } +}; + +// Sample function to make instanciation of SampleSender easier +template +std::shared_ptr> makeSampleSender(typename Protocol::socket& socket, Callback const& callback, std::shared_ptr const& sample) { + return std::make_shared>(socket, callback, sample); +} + +template +struct FluentDSocketImpl : FluentDSocket, std::enable_shared_from_this> { + static constexpr unsigned MAX_QUEUE_SIZE = 100; + boost::asio::io_context& context; + typename Protocol::socket socket; + FluentDSocketImpl(boost::asio::io_context& context) : context(context), socket(context) {} + bool ready = false; + std::deque> queue; + boost::system::error_code _failed; + + const boost::system::error_code& failed() const override { return _failed; } + + void sendCompletionHandler(boost::system::error_code const& ec) { + if (ec) { + // TODO: trace error + _failed = ec; + return; + } + if (queue.empty()) { + ready = true; + } else { + auto sample = queue.front(); + queue.pop_front(); + sendImpl(sample); + } + } + + + void sendImpl(std::shared_ptr const& sample) { + makeSampleSender(socket, [self = this->shared_from_this()](boost::system::error_code const& ec){ + self->sendCompletionHandler(ec); + }, sample); + } + + void send(std::shared_ptr const& sample) override { + if (_failed) { + return; + } + if (ready) { + ready = false; + sendImpl(sample); + } else { + if (queue.size() < MAX_QUEUE_SIZE) { + queue.push_back(sample); + } // TODO: else trace a warning + } + } + + void connect(NetworkAddress const& endpoint) override { + auto to = toEndpoint(endpoint); + socket.async_connect(to, [self = this->shared_from_this()](boost::system::error_code const& ec) { + if (ec) { + // TODO: error handling + self->_failed = ec; + return; + } + self->ready = true; + }); + } +}; + +} // namespace + +struct FluentDIngestorImpl { + using Protocol = FluentDIngestor::Protocol; + Protocol protocol; + NetworkAddress endpoint; + boost::asio::io_context& io_context; + std::shared_ptr socket; + boost::asio::steady_timer retryTimer; + FluentDIngestorImpl(Protocol protocol, NetworkAddress const& endpoint) + : protocol(protocol), endpoint(endpoint), io_context(ActorLineageProfiler::instance().context()), + retryTimer(io_context) { + connect(); + } + + ~FluentDIngestorImpl() { retryTimer.cancel(); } + + void connect() { + switch (protocol) { + case Protocol::TCP: + socket.reset(new FluentDSocketImpl(io_context)); + break; + case Protocol::UDP: + socket.reset(new FluentDSocketImpl(io_context)); + break; + } + socket->connect(endpoint); + } + + void retry() { + retryTimer = boost::asio::steady_timer(io_context, std::chrono::seconds(1)); + retryTimer.async_wait([this](auto const& ec) { + if (ec) { + return; + } + connect(); + }); + socket.reset(); + } +}; + +FluentDIngestor::~FluentDIngestor() { + delete impl; +} + +FluentDIngestor::FluentDIngestor(Protocol protocol, NetworkAddress& endpoint) + : impl(new FluentDIngestorImpl(protocol, endpoint)) {} + +void FluentDIngestor::ingest(const std::shared_ptr& sample) { + if (!impl->socket) { + // the connection failed in the past and we wait for a timeout before we retry + return; + } else if (impl->socket->failed()) { + impl->retry(); + return; + } else { + impl->socket->send(sample); + } +} + +void FluentDIngestor::getConfig(std::map& res) const { + res["ingestor"] = "fluentd"; + res["collector_endpoint"] = impl->endpoint.toString(); + res["collector_protocol"] = impl->protocol == Protocol::TCP ? "tcp" : "udp"; +} diff --git a/fdbclient/GlobalConfig.actor.cpp b/fdbclient/GlobalConfig.actor.cpp index dfadd7b3eb..a0f813f17e 100644 --- a/fdbclient/GlobalConfig.actor.cpp +++ b/fdbclient/GlobalConfig.actor.cpp @@ -34,6 +34,9 @@ const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_inf const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate"); const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost"); +const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency"); +const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window"); + GlobalConfig::GlobalConfig(Database& cx) : cx(cx), lastUpdate(0) {} GlobalConfig& GlobalConfig::globalConfig() { diff --git a/fdbclient/GlobalConfig.actor.h b/fdbclient/GlobalConfig.actor.h index 444f1ab697..746226a045 100644 --- a/fdbclient/GlobalConfig.actor.h +++ b/fdbclient/GlobalConfig.actor.h @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -49,6 +50,9 @@ extern const KeyRef fdbClientInfoTxnSizeLimit; extern const KeyRef transactionTagSampleRate; extern const KeyRef transactionTagSampleCost; +extern const KeyRef samplingFrequency; +extern const KeyRef samplingWindow; + // Structure used to hold the values stored by global configuration. The arena // is used as memory to store both the key and the value (the value is only // stored in the arena if it is an object; primitives are just copied). @@ -78,6 +82,7 @@ public: g_network->setGlobal(INetwork::enGlobalConfig, config); config->_updater = updater(config, dbInfo); // Bind changes in `db` to the `dbInfoChanged` AsyncTrigger. + // TODO: Change AsyncTrigger to a Reference forward(db, std::addressof(config->dbInfoChanged)); } else { GlobalConfig* config = reinterpret_cast(g_network->global(INetwork::enGlobalConfig)); @@ -137,9 +142,11 @@ public: Future onChange(); // Calls \ref fn when the value associated with \ref key is changed. \ref - // fn is passed the updated value for the key, or an empty optional if the - // key has been cleared. If the value is an allocated object, its memory - // remains in the control of the global configuration. + // key should be one of the string literals defined at the top of + // GlobalConfig.actor.cpp, to ensure memory validity. \ref fn is passed the + // updated value for the key, or an empty optional if the key has been + // cleared. If the value is an allocated object, its memory remains in the + // control of global configuration. void trigger(KeyRef key, std::function)> fn); private: @@ -171,6 +178,7 @@ private: AsyncTrigger configChanged; std::unordered_map> data; Version lastUpdate; + // The key should be a global config string literal key (see the top of this class). std::unordered_map)>> callbacks; }; diff --git a/fdbclient/NameLineage.cpp b/fdbclient/NameLineage.cpp new file mode 100644 index 0000000000..5f98cf73c4 --- /dev/null +++ b/fdbclient/NameLineage.cpp @@ -0,0 +1,25 @@ +/* + * NameLineage.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/NameLineage.h" + +namespace { +NameLineageCollector nameLineageCollector; +} diff --git a/fdbclient/NameLineage.h b/fdbclient/NameLineage.h new file mode 100644 index 0000000000..f949aeb13a --- /dev/null +++ b/fdbclient/NameLineage.h @@ -0,0 +1,42 @@ +/* + * NameLineage.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "fdbclient/ActorLineageProfiler.h" + +struct NameLineage : LineageProperties { + static constexpr std::string_view name = "Actor"sv; + const char* actorName; +}; + +struct NameLineageCollector : IALPCollector { + NameLineageCollector() : IALPCollector() {} + std::optional collect(ActorLineage* lineage) override { + auto str = lineage->get(&NameLineage::actorName); + if (str.has_value()) { + return std::string_view(*str, std::strlen(*str)); + } else { + return {}; + } + } +}; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 0a6a474249..99c23f122f 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -32,6 +32,8 @@ #include "fdbrpc/FailureMonitor.h" #include "fdbrpc/MultiInterface.h" +#include "fdbclient/ActorLineageProfiler.h" +#include "fdbclient/AnnotateActor.h" #include "fdbclient/Atomic.h" #include "fdbclient/ClusterInterface.h" #include "fdbclient/CoordinationInterface.h" @@ -42,6 +44,7 @@ #include "fdbclient/KeyBackedTypes.h" #include "fdbclient/KeyRangeMap.h" #include "fdbclient/ManagementAPI.actor.h" +#include "fdbclient/NameLineage.h" #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/MonitorLeader.h" #include "fdbclient/MutationList.h" @@ -50,6 +53,7 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/SystemData.h" +#include "fdbclient/TransactionLineage.h" #include "fdbclient/versions.h" #include "fdbrpc/LoadBalance.h" #include "fdbrpc/Net2FileSystem.h" @@ -87,6 +91,9 @@ using std::pair; namespace { +TransactionLineageCollector transactionLineageCollector; +NameLineageCollector nameLineageCollector; + template Future loadBalance( DatabaseContext* ctx, @@ -1260,6 +1267,14 @@ DatabaseContext::DatabaseContext(Reference( KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0")) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::ACTORLINEAGE, + SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE))); + registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique(SpecialKeySpace::getModuleRange( + SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, @@ -1761,6 +1776,8 @@ Database Database::createDatabase(Reference connFile, auto database = Database(db); GlobalConfig::create( database, Reference const>(clientInfo), std::addressof(clientInfo->get())); + GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency); + GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow); return database; } @@ -2744,8 +2761,10 @@ ACTOR Future watchValue(Future version, cx->invalidateCache(key); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); } else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) { - TEST(e.code() == error_code_watch_cancelled); // Too many watches on storage server, poll for changes + // clang-format off + TEST(e.code() == error_code_watch_cancelled); // Too many watches on the storage server, poll for changes instead TEST(e.code() == error_code_process_behind); // The storage servers are all behind + // clang-format on wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID)); } else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case // it was cancelled @@ -3333,6 +3352,7 @@ ACTOR Future getRange(Database cx, throw deterministicRandom()->randomChoice( std::vector{ transaction_too_old(), future_version() }); } + // state AnnotateActor annotation(currentLineage); GetKeyValuesReply _rep = wait(loadBalance(cx.getPtr(), beginServer.second, @@ -4093,8 +4113,7 @@ SpanID generateSpanID(int transactionTracingEnabled) { } } -Transaction::Transaction() - : info(TaskPriority::DefaultEndpoint, generateSpanID(true)), span(info.spanID, "Transaction"_loc) {} +Transaction::Transaction() = default; Transaction::Transaction(Database const& cx) : info(cx->taskID, generateSpanID(cx->transactionTracingEnabled)), numErrors(0), options(cx), @@ -6340,7 +6359,7 @@ void enableClientInfoLogging() { } ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID snapUID) { - TraceEvent("SnapCreateEnter").detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID); + TraceEvent("SnapCreateEnter").detail("SnapCmd", snapCmd).detail("UID", snapUID); try { loop { choose { @@ -6350,7 +6369,7 @@ ACTOR Future snapCreate(Database cx, Standalone snapCmd, UID sn ProxySnapRequest(snapCmd, snapUID, snapUID), cx->taskID, AtMostOnce::True))) { - TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID); + TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd).detail("UID", snapUID); return Void(); } } diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 3636b30c46..a695dc4ae8 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -180,6 +180,9 @@ struct TransactionInfo { // prefix/ : '0' - any keys equal or larger than this key are (definitely) not conflicting keys std::shared_ptr> conflictingKeys; + // Only available so that Transaction can have a default constructor, for use in state variables + TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {} + explicit TransactionInfo(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID), useProvisionalProxies(false) {} }; diff --git a/fdbclient/ProcessInterface.h b/fdbclient/ProcessInterface.h new file mode 100644 index 0000000000..815fab2005 --- /dev/null +++ b/fdbclient/ProcessInterface.h @@ -0,0 +1,81 @@ +/* + * ProcessInterface.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/AnnotateActor.h" +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/fdbrpc.h" + +constexpr UID WLTOKEN_PROCESS(-1, 12); + +struct ProcessInterface { + constexpr static FileIdentifier file_identifier = 985636; + RequestStream getInterface; + RequestStream actorLineage; + + template + void serialize(Ar& ar) { + serializer(ar, actorLineage); + } +}; + +struct GetProcessInterfaceRequest { + constexpr static FileIdentifier file_identifier = 7632546; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +// This type is used to send serialized sample data over the network. +struct SerializedSample { + constexpr static FileIdentifier file_identifier = 15785634; + + double time; + std::unordered_map data; + + template + void serialize(Ar& ar) { + serializer(ar, time, data); + } +}; + +struct ActorLineageReply { + constexpr static FileIdentifier file_identifier = 1887656; + std::vector samples; + + template + void serialize(Ar& ar) { + serializer(ar, samples); + } +}; + +struct ActorLineageRequest { + constexpr static FileIdentifier file_identifier = 11654765; + WaitState waitStateStart, waitStateEnd; + time_t timeStart, timeEnd; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, waitStateStart, waitStateEnd, timeStart, timeEnd, reply); + } +}; diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 441699df2d..dd206411cc 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -21,6 +21,14 @@ #include "boost/lexical_cast.hpp" #include "boost/algorithm/string.hpp" +#include +#include + +#include + +#include "fdbclient/ActorLineageProfiler.h" +#include "fdbclient/Knobs.h" +#include "fdbclient/ProcessInterface.h" #include "fdbclient/GlobalConfig.actor.h" #include "fdbclient/SpecialKeySpace.actor.h" #include "flow/Arena.h" @@ -67,7 +75,12 @@ std::unordered_map SpecialKeySpace::moduleToB { SpecialKeySpace::MODULE::GLOBALCONFIG, KeyRangeRef(LiteralStringRef("\xff\xff/global_config/"), LiteralStringRef("\xff\xff/global_config0")) }, { SpecialKeySpace::MODULE::TRACING, - KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) } + KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }, + { SpecialKeySpace::MODULE::ACTORLINEAGE, + KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) }, + { SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, + KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"), + LiteralStringRef("\xff\xff/actor_profiler_conf0")) } }; std::unordered_map SpecialKeySpace::managementApiCommandToRange = { @@ -104,6 +117,15 @@ std::unordered_map SpecialKeySpace::managementApiCommandT .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } }; +std::unordered_map SpecialKeySpace::actorLineageApiCommandToRange = { + { "state", + KeyRangeRef(LiteralStringRef("state/"), LiteralStringRef("state0")) + .withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) }, + { "time", + KeyRangeRef(LiteralStringRef("time/"), LiteralStringRef("time0")) + .withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) } +}; + std::set SpecialKeySpace::options = { "excluded/force", "failed/force", "excluded_locality/force", @@ -476,10 +498,10 @@ void SpecialKeySpace::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& r auto begin = writeImpls[range.begin]; auto end = writeImpls.rangeContainingKeyBefore(range.end)->value(); if (begin != end) { - TraceEvent(SevDebug, "SpecialKeySpaceCrossModuleClear").detail("Range", range.toString()); + TraceEvent(SevDebug, "SpecialKeySpaceCrossModuleClear").detail("Range", range); throw special_keys_cross_module_clear(); // ban cross module clear } else if (begin == nullptr) { - TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound").detail("Range", range.toString()); + TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound").detail("Range", range); throw special_keys_no_write_module_found(); } return begin->clear(ryw, range); @@ -1918,6 +1940,287 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke "Clear operation is forbidden for profile client. You can set it to default to disable profiling."); } +ActorLineageImpl::ActorLineageImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {} + +void parse(StringRef& val, int& i) { + i = std::stoi(val.toString()); +} + +void parse(StringRef& val, double& d) { + d = std::stod(val.toString()); +} + +void parse(StringRef& val, WaitState& w) { + if (val == LiteralStringRef("disk") || val == LiteralStringRef("Disk")) { + w = WaitState::Disk; + } else if (val == LiteralStringRef("network") || val == LiteralStringRef("Network")) { + w = WaitState::Network; + } else if (val == LiteralStringRef("running") || val == LiteralStringRef("Running")) { + w = WaitState::Running; + } else { + throw std::range_error("failed to parse run state"); + } +} + +void parse(StringRef& val, time_t& t) { + struct tm tm = { 0 }; + if (strptime(val.toString().c_str(), "%FT%T%z", &tm) == nullptr) { + throw std::invalid_argument("failed to parse ISO 8601 datetime"); + } + + long timezone = tm.tm_gmtoff; + t = timegm(&tm); + if (t == -1) { + throw std::runtime_error("failed to convert ISO 8601 datetime"); + } + t -= timezone; +} + +void parse(StringRef& val, NetworkAddress& a) { + auto address = NetworkAddress::parse(val.toString()); + if (!address.isValid()) { + throw std::invalid_argument("invalid host"); + } + a = address; +} + +// Base case function for parsing function below. +template +void parse(std::vector::iterator it, std::vector::iterator end, T& t1) { + if (it == end) { + return; + } + parse(*it, t1); +} + +// Given an iterator into a vector of string tokens, an iterator to the end of +// the search space in the vector (exclusive), and a list of references to +// types, parses each token in the vector into the associated type according to +// the order of the arguments. +// +// For example, given the vector ["1", "1.5", "127.0.0.1:4000"] and the +// argument list int a, double b, NetworkAddress c, after this function returns +// each parameter passed in will hold the parsed value from the token list. +// +// The appropriate parsing function must be implemented for the type you wish +// to parse. See the existing parsing functions above, and add your own if +// necessary. +template +void parse(std::vector::iterator it, std::vector::iterator end, T& t1, Types&... remaining) { + // Return as soon as all tokens have been parsed. This allows parameters + // passed at the end to act as optional parameters -- they will only be set + // if the value exists. + if (it == end) { + return; + } + + try { + parse(*it, t1); + parse(++it, end, remaining...); + } catch (Error& e) { + throw e; + } catch (std::exception& e) { + throw e; + } +} + +ACTOR static Future actorLineageGetRangeActor(ReadYourWritesTransaction* ryw, + KeyRef prefix, + KeyRangeRef kr) { + state RangeResult result; + + // Set default values for all fields. The default will be used if the field + // is missing in the key. + state NetworkAddress host; + state WaitState waitStateStart = WaitState{ 0 }; + state WaitState waitStateEnd = WaitState{ 2 }; + state time_t timeStart = 0; + state time_t timeEnd = std::numeric_limits::max(); + state int seqStart = 0; + state int seqEnd = std::numeric_limits::max(); + + state std::vector beginValues = kr.begin.removePrefix(prefix).splitAny("/"_sr); + state std::vector endValues = kr.end.removePrefix(prefix).splitAny("/"_sr); + // Require index (either "state" or "time") and address:port. + if (beginValues.size() < 2 || endValues.size() < 2) { + ryw->setSpecialKeySpaceErrorMsg("missing required parameters (index, host)"); + throw special_keys_api_failure(); + } + + state NetworkAddress endRangeHost; + try { + if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) { + // For the range \xff\xff/actor_lineage/state/ip:port/wait-state/time/seq + parse(beginValues.begin() + 1, beginValues.end(), host, waitStateStart, timeStart, seqStart); + if (kr.begin != kr.end) { + parse(endValues.begin() + 1, endValues.end(), endRangeHost, waitStateEnd, timeEnd, seqEnd); + } + } else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) { + // For the range \xff\xff/actor_lineage/time/ip:port/time/wait-state/seq + parse(beginValues.begin() + 1, beginValues.end(), host, timeStart, waitStateStart, seqStart); + if (kr.begin != kr.end) { + parse(endValues.begin() + 1, endValues.end(), endRangeHost, timeEnd, waitStateEnd, seqEnd); + } + } else { + ryw->setSpecialKeySpaceErrorMsg("invalid index in actor_lineage"); + throw special_keys_api_failure(); + } + } catch (Error& e) { + if (e.code() != special_keys_api_failure().code()) { + ryw->setSpecialKeySpaceErrorMsg("failed to parse key"); + throw special_keys_api_failure(); + } else { + throw e; + } + } + + if (kr.begin != kr.end && host != endRangeHost) { + // The client doesn't know about all the hosts, so a get range covering + // multiple hosts has no way of knowing which IP:port combos to use. + ryw->setSpecialKeySpaceErrorMsg("the host must remain the same on both ends of the range"); + throw special_keys_api_failure(); + } + + // Open endpoint to target process on each call. This can be optimized at + // some point... + state ProcessInterface process; + process.getInterface = RequestStream(Endpoint({ host }, WLTOKEN_PROCESS)); + ProcessInterface p = wait(retryBrokenPromise(process.getInterface, GetProcessInterfaceRequest{})); + process = p; + + ActorLineageRequest actorLineageRequest; + actorLineageRequest.waitStateStart = waitStateStart; + actorLineageRequest.waitStateEnd = waitStateEnd; + actorLineageRequest.timeStart = timeStart; + actorLineageRequest.timeEnd = timeEnd; + ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest)); + + time_t dt = 0; + int seq = -1; + for (const auto& sample : reply.samples) { + time_t datetime = (time_t)sample.time; + char buf[50]; + struct tm* tm; + tm = localtime(&datetime); + size_t size = strftime(buf, 50, "%FT%T%z", tm); + std::string date(buf, size); + + seq = dt == datetime ? seq + 1 : 0; + dt = datetime; + + for (const auto& [waitState, data] : sample.data) { + if (seq < seqStart) { continue; } + else if (seq >= seqEnd) { break; } + + std::ostringstream streamKey; + if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) { + streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("state").toString() << host.toString() + << "/" << to_string(waitState) << "/" << date; + } else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) { + streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("time").toString() << host.toString() + << "/" << date << "/" << to_string(waitState); + } else { + ASSERT(false); + } + streamKey << "/" << seq; + + msgpack::object_handle oh = msgpack::unpack(data.data(), data.size()); + msgpack::object deserialized = oh.get(); + + std::ostringstream stream; + stream << deserialized; + + result.push_back_deep(result.arena(), KeyValueRef(streamKey.str(), stream.str())); + } + + if (sample.data.size() == 0) { + std::ostringstream streamKey; + if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) { + streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("state").toString() << host.toString() + << "/Running/" << date; + } else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) { + streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("time").toString() << host.toString() + << "/" << date << "/Running"; + } else { + ASSERT(false); + } + streamKey << "/" << seq; + result.push_back_deep(result.arena(), KeyValueRef(streamKey.str(), "{}"_sr)); + } + } + + return result; +} + +Future ActorLineageImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + return actorLineageGetRangeActor(ryw, getKeyRange().begin, kr); +} + +namespace { +std::string_view to_string_view(StringRef sr) { + return std::string_view(reinterpret_cast(sr.begin()), sr.size()); +} +} // namespace + +ActorProfilerConf::ActorProfilerConf(KeyRangeRef kr) + : SpecialKeyRangeRWImpl(kr), config(ProfilerConfig::instance().getConfig()) {} + +Future ActorProfilerConf::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + RangeResult res; + std::string_view begin(to_string_view(kr.begin.removePrefix(range.begin))), + end(to_string_view(kr.end.removePrefix(range.begin))); + for (auto& p : config) { + if (p.first > end) { + break; + } else if (p.first > begin) { + KeyValueRef kv; + kv.key = StringRef(res.arena(), p.first).withPrefix(kr.begin, res.arena()); + kv.value = StringRef(res.arena(), p.second); + res.push_back(res.arena(), kv); + } + } + return res; +} + +void ActorProfilerConf::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + config[key.removePrefix(range.begin).toString()] = value.toString(); + ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional(value))); + didWrite = true; +} + +void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& kr) { + std::string begin(kr.begin.removePrefix(range.begin).toString()), end(kr.end.removePrefix(range.begin).toString()); + auto first = config.lower_bound(begin); + if (first == config.end()) { + // nothing to clear + return; + } + didWrite = true; + auto last = config.upper_bound(end); + config.erase(first, last); +} + +void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + std::string k = key.removePrefix(range.begin).toString(); + auto iter = config.find(k); + if (iter != config.end()) { + config.erase(iter); + } + didWrite = true; +} + +Future> ActorProfilerConf::commit(ReadYourWritesTransaction* ryw) { + Optional res{}; + try { + if (didWrite) { + ProfilerConfig::instance().reset(config); + } + return res; + } catch (ConfigError& err) { + return Optional{ err.description }; + } +} + MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} // Used to read the healthZoneKey diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index ed7e6da46a..1bc985b2fc 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -140,6 +140,8 @@ public: class SpecialKeySpace { public: enum class MODULE { + ACTORLINEAGE, // Sampling data + ACTOR_PROFILER_CONF, // profiler configuration CLUSTERFILEPATH, CONFIGURATION, // Configuration of the cluster CONNECTIONSTRING, @@ -197,6 +199,12 @@ public: static KeyRef getManagementApiCommandPrefix(const std::string& command) { return managementApiCommandToRange.at(command).begin; } + static KeyRangeRef getActorLineageApiCommandRange(const std::string& command) { + return actorLineageApiCommandToRange.at(command); + } + static KeyRef getActorLineageApiCommandPrefix(const std::string& command) { + return actorLineageApiCommandToRange.at(command).begin; + } static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option); static const std::set& getManagementApiOptionsSet() { return options; } static const std::set& getTracingOptions() { return tracingOptions; } @@ -225,6 +233,7 @@ private: static std::unordered_map moduleToBoundary; static std::unordered_map managementApiCommandToRange; // management command to its special keys' range + static std::unordered_map actorLineageApiCommandToRange; static std::set options; // "/