From 153de33f571228345e9a6d98c84fac931d2365e9 Mon Sep 17 00:00:00 2001 From: Lukas Joswiak Date: Thu, 3 Jun 2021 15:10:04 -0700 Subject: [PATCH 001/401] Revert "Merge pull request #4802 from sfc-gh-ljoswiak/revert/actor-lineage" This reverts commit 6499fa178e8f65a22105c2cd062a67209b562973, reversing changes made to 15126319577f915f28aa6308bbf066dc7ec992a2. --- CMakeLists.txt | 1 + cmake/GetMsgpack.cmake | 20 ++ fdbclient/ActorLineageProfiler.cpp | 381 ++++++++++++++++++++++++ fdbclient/ActorLineageProfiler.h | 192 ++++++++++++ fdbclient/AnnotateActor.cpp | 23 ++ fdbclient/AnnotateActor.h | 85 ++++++ fdbclient/CMakeLists.txt | 9 +- fdbclient/FluentDSampleIngestor.cpp | 257 ++++++++++++++++ fdbclient/GlobalConfig.actor.cpp | 7 + fdbclient/GlobalConfig.actor.h | 12 + fdbclient/NativeAPI.actor.cpp | 20 +- fdbclient/ProcessInterface.h | 81 +++++ fdbclient/SpecialKeySpace.actor.cpp | 290 +++++++++++++++++- fdbclient/SpecialKeySpace.actor.h | 29 ++ fdbclient/TransactionLineage.cpp | 25 ++ fdbclient/TransactionLineage.h | 128 ++++++++ fdbrpc/AsyncFileKAIO.actor.h | 7 +- fdbrpc/FlowTests.actor.cpp | 4 + fdbrpc/IAsyncFile.h | 4 + fdbrpc/Locality.h | 1 + fdbrpc/Net2FileSystem.cpp | 4 + fdbrpc/Net2FileSystem.h | 3 + fdbrpc/sim2.actor.cpp | 11 + fdbrpc/simulator.h | 4 + fdbserver/CMakeLists.txt | 3 + fdbserver/ClusterController.actor.cpp | 4 +- fdbserver/CommitProxyServer.actor.cpp | 6 +- fdbserver/GrvProxyServer.actor.cpp | 5 + fdbserver/RoleLineage.actor.cpp | 25 ++ fdbserver/RoleLineage.actor.h | 67 +++++ fdbserver/SigStack.cpp | 23 ++ fdbserver/WorkerInterface.actor.h | 35 +++ fdbserver/fdbserver.actor.cpp | 38 ++- fdbserver/storageserver.actor.cpp | 11 + fdbserver/worker.actor.cpp | 71 ++++- flow/CMakeLists.txt | 1 + flow/Net2.actor.cpp | 12 +- flow/Platform.actor.cpp | 4 + flow/Platform.h | 6 +- flow/Profiler.actor.cpp | 3 + flow/WriteOnlySet.actor.cpp | 273 +++++++++++++++++ flow/WriteOnlySet.h | 162 ++++++++++ flow/actorcompiler/ActorCompiler.cs | 2 + flow/actorcompiler/actorcompiler.csproj | 108 +------ flow/actorcompiler/actorcompiler.sln | 34 +++ flow/flow.cpp | 21 ++ flow/flow.h | 188 +++++++++++- flow/genericactors.actor.h | 4 + flow/network.h | 4 + flow/singleton.h | 237 +++++++++++++++ tests/TestRunner/local_cluster.py | 2 +- 51 files changed, 2816 insertions(+), 131 deletions(-) create mode 100644 cmake/GetMsgpack.cmake create mode 100644 fdbclient/ActorLineageProfiler.cpp create mode 100644 fdbclient/ActorLineageProfiler.h create mode 100644 fdbclient/AnnotateActor.cpp create mode 100644 fdbclient/AnnotateActor.h create mode 100644 fdbclient/FluentDSampleIngestor.cpp create mode 100644 fdbclient/ProcessInterface.h create mode 100644 fdbclient/TransactionLineage.cpp create mode 100644 fdbclient/TransactionLineage.h create mode 100644 fdbserver/RoleLineage.actor.cpp create mode 100644 fdbserver/RoleLineage.actor.h create mode 100644 fdbserver/SigStack.cpp create mode 100644 flow/WriteOnlySet.actor.cpp create mode 100644 flow/WriteOnlySet.h create mode 100644 flow/actorcompiler/actorcompiler.sln create mode 100644 flow/singleton.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 08df8edfe0..3803330790 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -152,6 +152,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") endif() include(CompileBoost) +include(GetMsgpack) add_subdirectory(flow) add_subdirectory(fdbrpc) add_subdirectory(fdbclient) diff --git a/cmake/GetMsgpack.cmake b/cmake/GetMsgpack.cmake new file mode 100644 index 0000000000..b3313f336e --- /dev/null +++ b/cmake/GetMsgpack.cmake @@ -0,0 +1,20 @@ +find_package(msgpack 3.3.0 EXACT QUIET CONFIG) + +add_library(msgpack INTERFACE) + +if(msgpack_FOUND) + target_link_libraries(msgpack INTERFACE msgpackc-cxx) +else() + include(ExternalProject) + ExternalProject_add(msgpackProject + URL "https://github.com/msgpack/msgpack-c/releases/download/cpp-3.3.0/msgpack-3.3.0.tar.gz" + URL_HASH SHA256=6e114d12a5ddb8cb11f669f83f32246e484a8addd0ce93f274996f1941c1f07b + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + ) + + ExternalProject_Get_property(msgpackProject SOURCE_DIR) + target_include_directories(msgpack SYSTEM INTERFACE "${SOURCE_DIR}/include") + add_dependencies(msgpack msgpackProject) +endif() diff --git a/fdbclient/ActorLineageProfiler.cpp b/fdbclient/ActorLineageProfiler.cpp new file mode 100644 index 0000000000..9be7a60704 --- /dev/null +++ b/fdbclient/ActorLineageProfiler.cpp @@ -0,0 +1,381 @@ +/* + * ActorLineageProfiler.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/flow.h" +#include "flow/singleton.h" +#include "fdbrpc/IAsyncFile.h" +#include "fdbclient/ActorLineageProfiler.h" +#include +#include +#include +#include + +using namespace std::literals; + +class Packer : public msgpack::packer { + struct visitor_t { + using VisitorMap = std::unordered_map>; + VisitorMap visitorMap; + + template + static void any_visitor(std::any const& val, Packer& packer) { + const T& v = std::any_cast(val); + packer.pack(v); + } + + template + struct populate_visitor_map; + template + struct populate_visitor_map { + static void populate(VisitorMap& map) { + map.emplace(std::type_index(typeid(Head)), any_visitor); + populate_visitor_map::populate(map); + } + }; + template + struct populate_visitor_map { + static void populate(VisitorMap&) {} + }; + + visitor_t() { + populate_visitor_map, + std::map, + std::map, + std::vector>>::populate(visitorMap); + } + + void visit(const std::any& val, Packer& packer) { + auto iter = visitorMap.find(val.type()); + if (iter == visitorMap.end()) { + TraceEvent(SevError, "PackerTypeNotFound").detail("Type", val.type().name()); + } else { + iter->second(val, packer); + } + } + }; + msgpack::sbuffer sbuffer; + // Initializing visitor_t involves building a type-map. As this is a relatively expensive operation, we don't want + // to do this each time we create a Packer object. So visitor_t is a stateless class and we only use it as a + // visitor. + crossbow::singleton visitor; + +public: + Packer() : msgpack::packer(sbuffer) {} + + void pack(std::any const& val) { visitor->visit(val, *this); } + + void pack(bool val) { + if (val) { + pack_true(); + } else { + pack_false(); + } + } + + void pack(uint64_t val) { + if (val <= std::numeric_limits::max()) { + pack_uint8(uint8_t(val)); + } else if (val <= std::numeric_limits::max()) { + pack_uint16(uint16_t(val)); + } else if (val <= std::numeric_limits::max()) { + pack_uint32(uint32_t(val)); + } else { + pack_uint64(val); + } + } + + void pack(int64_t val) { + if (val >= 0) { + this->pack(uint64_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int8(int8_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int16(int16_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int32(int32_t(val)); + } else if (val >= std::numeric_limits::min()) { + pack_int64(int64_t(val)); + } + } + + void pack(float val) { pack_float(val); } + void pack(double val) { pack_double(val); } + void pack(std::string const& str) { + pack_str(str.size()); + pack_str_body(str.data(), str.size()); + } + + void pack(std::string_view val) { + pack_str(val.size()); + pack_str_body(val.data(), val.size()); + } + + template + void pack(std::map const& map) { + pack_map(map.size()); + for (const auto& p : map) { + pack(p.first); + pack(p.second); + } + } + + template + void pack(std::vector const& val) { + pack_array(val.size()); + for (const auto& v : val) { + pack(v); + } + } + + std::pair getbuf() { + unsigned size = sbuffer.size(); + return std::make_pair(sbuffer.release(), size); + } +}; + +IALPCollectorBase::IALPCollectorBase() { + SampleCollector::instance().addCollector(this); +} + +std::map SampleCollectorT::collect(ActorLineage* lineage) { + std::map out; + for (auto& collector : collectors) { + auto val = collector->collect(lineage); + if (val.has_value()) { + out[collector->name()] = val.value(); + } + } + return out; +} + +std::shared_ptr SampleCollectorT::collect() { + auto sample = std::make_shared(); + double time = g_network->now(); + sample->time = time; + for (auto& p : getSamples) { + Packer packer; + std::vector> samples; + auto sampleVec = p.second(); + for (auto& val : sampleVec) { + auto m = collect(val.getPtr()); + if (!m.empty()) { + samples.emplace_back(std::move(m)); + } + } + if (!samples.empty()) { + packer.pack(samples); + sample->data[p.first] = packer.getbuf(); + } + } + return sample; +} + +void SampleCollection_t::refresh() { + auto sample = _collector->collect(); + auto min = std::min(sample->time - windowSize, sample->time); + { + Lock _{ mutex }; + data.emplace_back(std::move(sample)); + } + double oldest = data.front()->time; + // we don't need to check for data.empty() in this loop (or the inner loop) as we know that we will end + // up with at least one entry which is the most recent sample + while (oldest < min) { + Lock _{ mutex }; + // we remove at most 10 elements at a time. This is so we don't block the main thread for too long. + for (int i = 0; i < 10 && oldest < min; ++i) { + data.pop_front(); + oldest = data.front()->time; + } + } + //config->ingest(sample); +} + +std::vector> SampleCollection_t::get(double from /*= 0.0*/, + double to /*= std::numeric_limits::max()*/) const { + Lock _{ mutex }; + std::vector> res; + for (const auto& sample : data) { + if (sample->time > to) { + break; + } else if (sample->time >= from) { + res.push_back(sample); + } + } + return res; +} + +struct ProfilerImpl { + boost::asio::io_context context; + boost::asio::executor_work_guard workGuard; + boost::asio::steady_timer timer; + std::thread mainThread; + unsigned frequency; + + SampleCollection collection; + + ProfilerImpl() : workGuard(context.get_executor()), timer(context) { + mainThread = std::thread([this]() { context.run(); }); + } + ~ProfilerImpl() { + setFrequency(0); + workGuard.reset(); + mainThread.join(); + } + + void profileHandler(boost::system::error_code const& ec) { + if (ec) { + return; + } + collection->refresh(); + timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency)); + timer.async_wait([this](auto const& ec) { profileHandler(ec); }); + } + + void setFrequency(unsigned frequency) { + boost::asio::post(context, [this, frequency]() { + this->frequency = frequency; + timer.cancel(); + if (frequency > 0) { + profileHandler(boost::system::error_code{}); + } + }); + } +}; + +ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) { + collection->collector()->addGetter(WaitState::Network, + std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet()))); + collection->collector()->addGetter( + WaitState::Disk, + std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet()))); + collection->collector()->addGetter(WaitState::Running, []() { + auto res = currentLineageThreadSafe.get(); + if (res.isValid()) { + return std::vector>({ res }); + } + return std::vector>(); + }); +} + +ActorLineageProfilerT::~ActorLineageProfilerT() { + delete impl; +} + +void ActorLineageProfilerT::setFrequency(unsigned frequency) { + impl->setFrequency(frequency); +} + +boost::asio::io_context& ActorLineageProfilerT::context() { + return impl->context; +} + +SampleIngestor::~SampleIngestor() {} + +// Callback used to update the sampling profilers run frequency whenever the +// frequency changes. +void samplingProfilerUpdateFrequency(std::optional freq) { + double frequency = 0; + if (freq.has_value()) { + frequency = std::any_cast(freq.value()); + } + TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency); + ActorLineageProfiler::instance().setFrequency(frequency); +} + +void ProfilerConfigT::reset(std::map const& config) { + bool expectNoMore = false, useFluentD = false, useTCP = false; + std::string endpoint; + ConfigError err; + for (auto& kv : config) { + if (expectNoMore) { + err.description = format("Unexpected option %s", kv.first.c_str()); + throw err; + } + if (kv.first == "collector") { + std::string val = kv.second; + std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); }); + if (val == "none") { + setBackend(std::make_shared()); + } else if (val == "fluentd") { + useFluentD = true; + } else { + err.description = format("Unsupported collector: %s", val.c_str()); + throw err; + } + } else if (kv.first == "collector_endpoint") { + endpoint = kv.second; + } else if (kv.first == "collector_protocol") { + auto val = kv.second; + std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); }); + if (val == "tcp") { + useTCP = true; + } else if (val == "udp") { + useTCP = false; + } else { + err.description = format("Unsupported protocol for fluentd: %s", kv.second.c_str()); + throw err; + } + } else { + err.description = format("Unknown option %s", kv.first.c_str()); + throw err; + } + } + if (useFluentD) { + if (endpoint.empty()) { + err.description = "Endpoint is required for fluentd ingestor"; + throw err; + } + NetworkAddress address; + try { + address = NetworkAddress::parse(endpoint); + } catch (Error& e) { + err.description = format("Can't parse address %s", endpoint.c_str()); + throw err; + } + setBackend(std::make_shared( + useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::TCP, address)); + } +} + +std::map ProfilerConfigT::getConfig() const { + std::map res; + if (ingestor) { + ingestor->getConfig(res); + } + return res; +} + +// Callback used to update the sample collector window size. +void samplingProfilerUpdateWindow(std::optional window) { + double duration = 0; + if (window.has_value()) { + duration = std::any_cast(window.value()); + } + TraceEvent(SevInfo, "SamplingProfilerUpdateWindow").detail("Duration", duration); + SampleCollection::instance().setWindowSize(duration); +} diff --git a/fdbclient/ActorLineageProfiler.h b/fdbclient/ActorLineageProfiler.h new file mode 100644 index 0000000000..30a3eec3e7 --- /dev/null +++ b/fdbclient/ActorLineageProfiler.h @@ -0,0 +1,192 @@ +/* + * ActorLineageProfiler.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbclient/AnnotateActor.h" + +#include +#include +#include +#include +#include +#include +#include "flow/singleton.h" +#include "flow/flow.h" + +void samplingProfilerUpdateFrequency(std::optional freq); +void samplingProfilerUpdateWindow(std::optional window); + +struct IALPCollectorBase { + virtual std::optional collect(ActorLineage*) = 0; + virtual const std::string_view& name() = 0; + IALPCollectorBase(); +}; + +template +struct IALPCollector : IALPCollectorBase { + const std::string_view& name() override { return T::name; } +}; + +struct Sample : std::enable_shared_from_this { + double time = 0.0; + Sample() {} + Sample(Sample const&) = delete; + Sample& operator=(Sample const&) = delete; + std::unordered_map> data; + ~Sample() { + std::for_each(data.begin(), data.end(), [](std::pair> entry) { + ::free(entry.second.first); + }); + } +}; + +class SampleIngestor : std::enable_shared_from_this { +public: + virtual ~SampleIngestor(); + virtual void ingest(std::shared_ptr const& sample) = 0; + virtual void getConfig(std::map&) const = 0; +}; + +class NoneIngestor : public SampleIngestor { +public: + void ingest(std::shared_ptr const& sample) override {} + void getConfig(std::map& res) const override { res["ingestor"] = "none"; } +}; + +// The FluentD ingestor uses the pimpl idiom. This is to make compilation less heavy weight as this implementation has +// dependencies to boost::asio +struct FluentDIngestorImpl; + +class FluentDIngestor : public SampleIngestor { +public: // Public Types + enum class Protocol { TCP, UDP }; + +private: // members + FluentDIngestorImpl* impl; + +public: // interface + void ingest(std::shared_ptr const& sample) override; + FluentDIngestor(Protocol protocol, NetworkAddress& endpoint); + void getConfig(std::map& res) const override; + ~FluentDIngestor(); +}; + +struct ConfigError { + std::string description; +}; + +class ProfilerConfigT { +private: // private types + using Lock = std::unique_lock; + friend class crossbow::create_static; + +private: // members + std::shared_ptr ingestor = std::make_shared(); + +private: // construction + ProfilerConfigT() {} + ProfilerConfigT(ProfilerConfigT const&) = delete; + ProfilerConfigT& operator=(ProfilerConfigT const&) = delete; + void setBackend(std::shared_ptr ingestor) { this->ingestor = ingestor; } + +public: + void reset(std::map const& config); + std::map getConfig() const; +}; + +using ProfilerConfig = crossbow::singleton; + +class SampleCollectorT { +public: // Types + friend struct crossbow::create_static; + using Getter = std::function>()>; + +private: + std::vector collectors; + std::map getSamples; + SampleCollectorT() {} + std::map collect(ActorLineage* lineage); + +public: + void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); } + std::shared_ptr collect(); + void addGetter(WaitState waitState, Getter const& getter) { getSamples[waitState] = getter; }; +}; + +using SampleCollector = crossbow::singleton; + +class SampleCollection_t { + friend struct crossbow::create_static; + using Lock = std::unique_lock; + SampleCollection_t() {} + + SampleCollector _collector; + mutable std::mutex mutex; + std::atomic windowSize = 0.0; + std::deque> data; + ProfilerConfig config; + +public: + /** + * Define how many samples the collection shoul keep. The window size is defined by time dimension. + * + * \param duration How long a sample should be kept in the collection. + */ + void setWindowSize(double duration) { windowSize.store(duration); } + /** + * By default returns reference counted pointers of all samples. A window can be defined in terms of absolute time. + * + * \param from The minimal age of all returned samples. + * \param to The max age of all returned samples. + */ + std::vector> get(double from = 0.0, double to = std::numeric_limits::max()) const; + /** + * Collects all new samples from the sample collector and stores them in the collection. + */ + void refresh(); + const SampleCollector& collector() const { return _collector; } + SampleCollector& collector() { return _collector; } +}; + +using SampleCollection = crossbow::singleton; + +struct ProfilerImpl; + +namespace boost { +namespace asio { +// forward declare io_context because including boost asio is super expensive +class io_context; +} // namespace asio +} // namespace boost + +class ActorLineageProfilerT { + friend struct crossbow::create_static; + ProfilerImpl* impl; + SampleCollection collection; + ActorLineageProfilerT(); + +public: + ~ActorLineageProfilerT(); + void setFrequency(unsigned frequency); + boost::asio::io_context& context(); +}; + +using ActorLineageProfiler = crossbow::singleton; diff --git a/fdbclient/AnnotateActor.cpp b/fdbclient/AnnotateActor.cpp new file mode 100644 index 0000000000..80b9a8cec4 --- /dev/null +++ b/fdbclient/AnnotateActor.cpp @@ -0,0 +1,23 @@ +/* + * AnnotateActor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/AnnotateActor.h" + +std::map>()>> samples; diff --git a/fdbclient/AnnotateActor.h b/fdbclient/AnnotateActor.h new file mode 100644 index 0000000000..dfc944fd02 --- /dev/null +++ b/fdbclient/AnnotateActor.h @@ -0,0 +1,85 @@ +/* + * AnnotateActor.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "flow/flow.h" +#include "flow/network.h" + +#include + +// Used to manually instrument waiting actors to collect samples for the +// sampling profiler. +struct AnnotateActor { + unsigned index; + bool set; + + AnnotateActor() : set(false) {} + + AnnotateActor(Reference lineage) : set(true) { + index = g_network->getActorLineageSet().insert(lineage); + if (index == ActorLineageSet::npos) { + set = false; + } + } + + AnnotateActor(const AnnotateActor& other) = delete; + AnnotateActor(AnnotateActor&& other) = delete; + AnnotateActor& operator=(const AnnotateActor& other) = delete; + + AnnotateActor& operator=(AnnotateActor&& other) { + if (this == &other) { + return *this; + } + + this->index = other.index; + this->set = other.set; + + other.set = false; + + return *this; + } + + ~AnnotateActor() { + if (set) { + g_network->getActorLineageSet().erase(index); + } + } +}; + +enum class WaitState { Disk, Network, Running }; +// usually we shouldn't use `using namespace` in a header file, but literals should be safe as user defined literals +// need to be prefixed with `_` +using namespace std::literals; + +constexpr std::string_view to_string(WaitState st) { + switch (st) { + case WaitState::Disk: + return "Disk"sv; + case WaitState::Network: + return "Network"sv; + case WaitState::Running: + return "Running"sv; + default: + return ""sv; + } +} + +extern std::map>()>> samples; diff --git a/fdbclient/CMakeLists.txt b/fdbclient/CMakeLists.txt index 75b9fd5a0a..1e669f2e00 100644 --- a/fdbclient/CMakeLists.txt +++ b/fdbclient/CMakeLists.txt @@ -1,4 +1,7 @@ set(FDBCLIENT_SRCS + ActorLineageProfiler.h + ActorLineageProfiler.cpp + AnnotateActor.cpp AsyncFileS3BlobStore.actor.cpp AsyncFileS3BlobStore.actor.h AsyncTaskThread.actor.cpp @@ -27,6 +30,7 @@ set(FDBCLIENT_SRCS EventTypes.actor.h FDBOptions.h FDBTypes.h + FluentDSampleIngestor.cpp FileBackupAgent.actor.cpp GlobalConfig.h GlobalConfig.actor.h @@ -141,8 +145,7 @@ endif() add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs}) add_dependencies(fdbclient fdboptions) +target_link_libraries(fdbclient PUBLIC fdbrpc msgpack) if(BUILD_AZURE_BACKUP) - target_link_libraries(fdbclient PUBLIC fdbrpc PRIVATE curl uuid azure-storage-lite) -else() - target_link_libraries(fdbclient PUBLIC fdbrpc) + target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite) endif() diff --git a/fdbclient/FluentDSampleIngestor.cpp b/fdbclient/FluentDSampleIngestor.cpp new file mode 100644 index 0000000000..08d5bfe55f --- /dev/null +++ b/fdbclient/FluentDSampleIngestor.cpp @@ -0,0 +1,257 @@ +/* + * FluentDSampleIngestor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/ActorLineageProfiler.h" +#include +#include +#include + +namespace { + +boost::asio::ip::address ipAddress(IPAddress const& n) { + if (n.isV6()) { + return boost::asio::ip::address_v6(n.toV6()); + } else { + return boost::asio::ip::address_v4(n.toV4()); + } +} + +template +boost::asio::ip::basic_endpoint toEndpoint(NetworkAddress const n) { + return boost::asio::ip::basic_endpoint(ipAddress(n.ip), n.port); +} + +struct FluentDSocket { + virtual ~FluentDSocket() {} + virtual void connect(NetworkAddress const& endpoint) = 0; + virtual void send(std::shared_ptr const& sample) = 0; + virtual const boost::system::error_code& failed() const = 0; +}; + +template +class SampleSender : public std::enable_shared_from_this> { + using Socket = typename Protocol::socket; + using Iter = typename decltype(Sample::data)::iterator; + Socket& socket; + Callback callback; + Iter iter, end; + + struct Buf { + const char* data; + const unsigned size; + Buf(const char* data, unsigned size) : data(data), size(size) {} + Buf(Buf const&) = delete; + Buf& operator=(Buf const&) = delete; + ~Buf() { delete[] data; } + }; + + void sendCompletionHandler(boost::system::error_code const& ec) { + if (ec) { + callback(ec); + } else { + ++iter; + sendNext(); + } + } + + void send(boost::asio::ip::tcp::socket& socket, std::shared_ptr const& buf) { + boost::asio::async_write( + socket, + boost::asio::const_buffer(buf->data, buf->size), + [buf, self = this->shared_from_this()](auto const& ec, size_t) { self->sendCompletionHandler(ec); }); + } + void send(boost::asio::ip::udp::socket& socket, std::shared_ptr const& buf) { + socket.async_send( + boost::asio::const_buffer(buf->data, buf->size), + [buf, self = this->shared_from_this()](auto const& ec, size_t) { self->sendCompletionHandler(ec); }); + } + + void sendNext() { + if (iter == end) { + callback(boost::system::error_code()); + } + // 1. calculate size of buffer + unsigned size = 1; // 1 for fixmap identifier byte + auto waitState = to_string(iter->first); + if (waitState.size() < 32) { + size = waitState.size() + 1; + } else { + size = waitState.size() + 2; + } + size += iter->second.second; + // 2. allocate the buffer + std::unique_ptr buf(new char[size]); + unsigned off = 0; + // 3. serialize fixmap + buf[off++] = 0x81; // map of size 1 + // 3.1 serialize key + if (waitState.size() < 32) { + buf[off++] = 0xa0 + waitState.size(); // fixstr + } else { + buf[off++] = 0xd9; + buf[off++] = char(waitState.size()); + } + memcpy(buf.get() + off, waitState.data(), waitState.size()); + off += waitState.size(); + // 3.2 append serialized value + memcpy(buf.get() + off, iter->second.first, iter->second.second); + // 4. send the result to fluentd + send(socket, std::make_shared(buf.release(), size)); + } + +public: + SampleSender(Socket& socket, Callback const& callback, std::shared_ptr const& sample) + : socket(socket), callback(callback), iter(sample->data.begin()), end(sample->data.end()) { + sendNext(); + } +}; + +// Sample function to make instanciation of SampleSender easier +template +std::shared_ptr> makeSampleSender(typename Protocol::socket& socket, Callback const& callback, std::shared_ptr const& sample) { + return std::make_shared>(socket, callback, sample); +} + +template +struct FluentDSocketImpl : FluentDSocket, std::enable_shared_from_this> { + static constexpr unsigned MAX_QUEUE_SIZE = 100; + boost::asio::io_context& context; + typename Protocol::socket socket; + FluentDSocketImpl(boost::asio::io_context& context) : context(context), socket(context) {} + bool ready = false; + std::deque> queue; + boost::system::error_code _failed; + + const boost::system::error_code& failed() const override { return _failed; } + + void sendCompletionHandler(boost::system::error_code const& ec) { + if (ec) { + // TODO: trace error + _failed = ec; + return; + } + if (queue.empty()) { + ready = true; + } else { + auto sample = queue.front(); + queue.pop_front(); + sendImpl(sample); + } + } + + + void sendImpl(std::shared_ptr const& sample) { + makeSampleSender(socket, [self = this->shared_from_this()](boost::system::error_code const& ec){ + self->sendCompletionHandler(ec); + }, sample); + } + + void send(std::shared_ptr const& sample) override { + if (_failed) { + return; + } + if (ready) { + ready = false; + sendImpl(sample); + } else { + if (queue.size() < MAX_QUEUE_SIZE) { + queue.push_back(sample); + } // TODO: else trace a warning + } + } + + void connect(NetworkAddress const& endpoint) override { + auto to = toEndpoint(endpoint); + socket.async_connect(to, [self = this->shared_from_this()](boost::system::error_code const& ec) { + if (ec) { + // TODO: error handling + self->_failed = ec; + return; + } + self->ready = true; + }); + } +}; + +} // namespace + +struct FluentDIngestorImpl { + using Protocol = FluentDIngestor::Protocol; + Protocol protocol; + NetworkAddress endpoint; + boost::asio::io_context& io_context; + std::unique_ptr socket; + boost::asio::steady_timer retryTimer; + FluentDIngestorImpl(Protocol protocol, NetworkAddress const& endpoint) + : protocol(protocol), endpoint(endpoint), io_context(ActorLineageProfiler::instance().context()), + retryTimer(io_context) { + connect(); + } + + ~FluentDIngestorImpl() { retryTimer.cancel(); } + + void connect() { + switch (protocol) { + case Protocol::TCP: + socket.reset(new FluentDSocketImpl(io_context)); + break; + case Protocol::UDP: + socket.reset(new FluentDSocketImpl(io_context)); + break; + } + socket->connect(endpoint); + } + + void retry() { + retryTimer = boost::asio::steady_timer(io_context, std::chrono::seconds(1)); + retryTimer.async_wait([this](auto const& ec) { + if (ec) { + return; + } + connect(); + }); + socket.reset(); + } +}; + +FluentDIngestor::~FluentDIngestor() { + delete impl; +} + +FluentDIngestor::FluentDIngestor(Protocol protocol, NetworkAddress& endpoint) + : impl(new FluentDIngestorImpl(protocol, endpoint)) {} + +void FluentDIngestor::ingest(const std::shared_ptr& sample) { + if (!impl->socket) { + // the connection failed in the past and we wait for a timeout before we retry + return; + } else if (impl->socket->failed()) { + impl->retry(); + return; + } else { + impl->socket->send(sample); + } +} + +void FluentDIngestor::getConfig(std::map& res) const { + res["ingestor"] = "fluentd"; + res["collector_endpoint"] = impl->endpoint.toString(); + res["collector_protocol"] = impl->protocol == Protocol::TCP ? "tcp" : "udp"; +} diff --git a/fdbclient/GlobalConfig.actor.cpp b/fdbclient/GlobalConfig.actor.cpp index a5b4febdea..9dcffd9861 100644 --- a/fdbclient/GlobalConfig.actor.cpp +++ b/fdbclient/GlobalConfig.actor.cpp @@ -34,6 +34,9 @@ const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_inf const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate"); const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost"); +const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency"); +const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window"); + GlobalConfig::GlobalConfig(Database& cx) : cx(cx), lastUpdate(0) {} GlobalConfig& GlobalConfig::globalConfig() { @@ -42,6 +45,10 @@ GlobalConfig& GlobalConfig::globalConfig() { return *reinterpret_cast(res); } +void GlobalConfig::updateDBInfo(Reference> dbInfo) { + // this->dbInfo = dbInfo; +} + Key GlobalConfig::prefixedKey(KeyRef key) { return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin); } diff --git a/fdbclient/GlobalConfig.actor.h b/fdbclient/GlobalConfig.actor.h index 2d63d8de60..92a51d5260 100644 --- a/fdbclient/GlobalConfig.actor.h +++ b/fdbclient/GlobalConfig.actor.h @@ -27,7 +27,9 @@ #define FDBCLIENT_GLOBALCONFIG_ACTOR_H #include +#include #include +#include #include #include @@ -49,6 +51,9 @@ extern const KeyRef fdbClientInfoTxnSizeLimit; extern const KeyRef transactionTagSampleRate; extern const KeyRef transactionTagSampleCost; +extern const KeyRef samplingFrequency; +extern const KeyRef samplingWindow; + // Structure used to hold the values stored by global configuration. The arena // is used as memory to store both the key and the value (the value is only // stored in the arena if it is an object; primitives are just copied). @@ -90,6 +95,13 @@ public: // configuration. static GlobalConfig& globalConfig(); + // Updates the ClientDBInfo object used by global configuration to read new + // data. For server processes, this value needs to be set by the cluster + // controller, but global config is initialized before the cluster + // controller is, so this function provides a mechanism to update the + // object after initialization. + void updateDBInfo(Reference> dbInfo); + // Use this function to turn a global configuration key defined above into // the full path needed to set the value in the database. // diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 5e07c1c38d..7b6c17092e 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -32,6 +32,8 @@ #include "fdbrpc/FailureMonitor.h" #include "fdbrpc/MultiInterface.h" +#include "fdbclient/ActorLineageProfiler.h" +#include "fdbclient/AnnotateActor.h" #include "fdbclient/Atomic.h" #include "fdbclient/ClusterInterface.h" #include "fdbclient/CoordinationInterface.h" @@ -49,6 +51,7 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/SystemData.h" +#include "fdbclient/TransactionLineage.h" #include "fdbclient/versions.h" #include "fdbrpc/LoadBalance.h" #include "fdbrpc/Net2FileSystem.h" @@ -86,6 +89,8 @@ using std::pair; namespace { +TransactionLineageCollector transactionLineageCollector; + template Future loadBalance( DatabaseContext* ctx, @@ -1259,6 +1264,14 @@ DatabaseContext::DatabaseContext(Reference( KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0")) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::ACTORLINEAGE, + SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE))); + registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique(SpecialKeySpace::getModuleRange( + SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, @@ -1754,6 +1767,8 @@ Database Database::createDatabase(Reference connFile, auto database = Database(db); GlobalConfig::create(database, clientInfo, std::addressof(clientInfo->get())); + GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency); + GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow); return database; } @@ -2710,8 +2725,10 @@ ACTOR Future watchValue(Future version, cx->invalidateCache(key); wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID)); } else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) { - TEST(e.code() == error_code_watch_cancelled); // Too many watches on storage server, poll for changes + // clang-format off + TEST(e.code() == error_code_watch_cancelled); // Too many watches on the storage server, poll for changes instead TEST(e.code() == error_code_process_behind); // The storage servers are all behind + // clang-format on wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID)); } else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case // it was cancelled @@ -3299,6 +3316,7 @@ ACTOR Future getRange(Database cx, throw deterministicRandom()->randomChoice( std::vector{ transaction_too_old(), future_version() }); } + state AnnotateActor annotation(currentLineage); GetKeyValuesReply _rep = wait(loadBalance(cx.getPtr(), beginServer.second, diff --git a/fdbclient/ProcessInterface.h b/fdbclient/ProcessInterface.h new file mode 100644 index 0000000000..11bafc2987 --- /dev/null +++ b/fdbclient/ProcessInterface.h @@ -0,0 +1,81 @@ +/* + * ProcessInterface.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2021 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/AnnotateActor.h" +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/fdbrpc.h" + +constexpr UID WLTOKEN_PROCESS(-1, 11); + +struct ProcessInterface { + constexpr static FileIdentifier file_identifier = 985636; + RequestStream getInterface; + RequestStream actorLineage; + + template + void serialize(Ar& ar) { + serializer(ar, actorLineage); + } +}; + +struct GetProcessInterfaceRequest { + constexpr static FileIdentifier file_identifier = 7632546; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +// This type is used to send serialized sample data over the network. +struct SerializedSample { + constexpr static FileIdentifier file_identifier = 15785634; + + double time; + std::unordered_map data; + + template + void serialize(Ar& ar) { + serializer(ar, time, data); + } +}; + +struct ActorLineageReply { + constexpr static FileIdentifier file_identifier = 1887656; + std::vector samples; + + template + void serialize(Ar& ar) { + serializer(ar, samples); + } +}; + +struct ActorLineageRequest { + constexpr static FileIdentifier file_identifier = 11654765; + WaitState waitStateStart, waitStateEnd; + time_t timeStart, timeEnd; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, waitStateStart, waitStateEnd, timeStart, timeEnd, reply); + } +}; diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index d51d27a23c..8d0c7e5361 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -21,6 +21,14 @@ #include "boost/lexical_cast.hpp" #include "boost/algorithm/string.hpp" +#include +#include + +#include + +#include "fdbclient/ActorLineageProfiler.h" +#include "fdbclient/Knobs.h" +#include "fdbclient/ProcessInterface.h" #include "fdbclient/GlobalConfig.actor.h" #include "fdbclient/SpecialKeySpace.actor.h" #include "flow/Arena.h" @@ -67,7 +75,12 @@ std::unordered_map SpecialKeySpace::moduleToB { SpecialKeySpace::MODULE::GLOBALCONFIG, KeyRangeRef(LiteralStringRef("\xff\xff/global_config/"), LiteralStringRef("\xff\xff/global_config0")) }, { SpecialKeySpace::MODULE::TRACING, - KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) } + KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }, + { SpecialKeySpace::MODULE::ACTORLINEAGE, + KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) }, + { SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, + KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"), + LiteralStringRef("\xff\xff/actor_profiler_conf0")) } }; std::unordered_map SpecialKeySpace::managementApiCommandToRange = { @@ -98,6 +111,15 @@ std::unordered_map SpecialKeySpace::managementApiCommandT .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } }; +std::unordered_map SpecialKeySpace::actorLineageApiCommandToRange = { + { "state", + KeyRangeRef(LiteralStringRef("state/"), LiteralStringRef("state0")) + .withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) }, + { "time", + KeyRangeRef(LiteralStringRef("time/"), LiteralStringRef("time0")) + .withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) } +}; + std::set SpecialKeySpace::options = { "excluded/force", "failed/force" }; std::set SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey }; @@ -1908,6 +1930,272 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke "Clear operation is forbidden for profile client. You can set it to default to disable profiling."); } +ActorLineageImpl::ActorLineageImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {} + +void parse(StringRef& val, int& i) { + i = std::stoi(val.toString()); +} + +void parse(StringRef& val, double& d) { + d = std::stod(val.toString()); +} + +void parse(StringRef& val, WaitState& w) { + if (val == LiteralStringRef("disk")) { + w = WaitState::Disk; + } else if (val == LiteralStringRef("network")) { + w = WaitState::Network; + } else if (val == LiteralStringRef("running")) { + w = WaitState::Running; + } else { + throw std::range_error("failed to parse run state"); + } +} + +void parse(StringRef& val, time_t& t) { + struct tm tm = { 0 }; + if (strptime(val.toString().c_str(), "%FT%T%z", &tm) == nullptr) { + throw std::invalid_argument("failed to parse ISO 8601 datetime"); + } + + long timezone = tm.tm_gmtoff; + t = timegm(&tm); + if (t == -1) { + throw std::runtime_error("failed to convert ISO 8601 datetime"); + } + t -= timezone; +} + +void parse(StringRef& val, NetworkAddress& a) { + auto address = NetworkAddress::parse(val.toString()); + if (!address.isValid()) { + throw std::invalid_argument("invalid host"); + } + a = address; +} + +// Base case function for parsing function below. +template +void parse(std::vector::iterator it, std::vector::iterator end, T& t1) { + if (it == end) { + return; + } + parse(*it, t1); +} + +// Given an iterator into a vector of string tokens, an iterator to the end of +// the search space in the vector (exclusive), and a list of references to +// types, parses each token in the vector into the associated type according to +// the order of the arguments. +// +// For example, given the vector ["1", "1.5", "127.0.0.1:4000"] and the +// argument list int a, double b, NetworkAddress c, after this function returns +// each parameter passed in will hold the parsed value from the token list. +// +// The appropriate parsing function must be implemented for the type you wish +// to parse. See the existing parsing functions above, and add your own if +// necessary. +template +void parse(std::vector::iterator it, std::vector::iterator end, T& t1, Types&... remaining) { + // Return as soon as all tokens have been parsed. This allows parameters + // passed at the end to act as optional parameters -- they will only be set + // if the value exists. + if (it == end) { + return; + } + + try { + parse(*it, t1); + parse(++it, end, remaining...); + } catch (Error& e) { + throw e; + } catch (std::exception& e) { + throw e; + } +} + +ACTOR static Future actorLineageGetRangeActor(ReadYourWritesTransaction* ryw, + KeyRef prefix, + KeyRangeRef kr) { + state RangeResult result; + + // Set default values for all fields. The default will be used if the field + // is missing in the key. + state NetworkAddress host; + state WaitState waitStateStart = WaitState{ 0 }; + state WaitState waitStateEnd = WaitState{ 2 }; + state time_t timeStart = 0; + state time_t timeEnd = std::numeric_limits::max(); + state int seqStart = 0; + state int seqEnd = std::numeric_limits::max(); + + state std::vector beginValues = kr.begin.removePrefix(prefix).splitAny("/"_sr); + state std::vector endValues = kr.end.removePrefix(prefix).splitAny("/"_sr); + // Require index (either "state" or "time") and address:port. + if (beginValues.size() < 2 || endValues.size() < 2) { + ryw->setSpecialKeySpaceErrorMsg("missing required parameters (index, host)"); + throw special_keys_api_failure(); + } + + state NetworkAddress endRangeHost; + try { + if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) { + // For the range \xff\xff/actor_lineage/state/ip:port/wait-state/time/seq + parse(beginValues.begin() + 1, beginValues.end(), host, waitStateStart, timeStart, seqStart); + if (kr.begin != kr.end) { + parse(endValues.begin() + 1, endValues.end(), endRangeHost, waitStateEnd, timeEnd, seqEnd); + } + } else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) { + // For the range \xff\xff/actor_lineage/time/ip:port/time/wait-state/seq + parse(beginValues.begin() + 1, beginValues.end(), host, timeStart, waitStateStart, seqStart); + if (kr.begin != kr.end) { + parse(endValues.begin() + 1, endValues.end(), endRangeHost, timeEnd, waitStateEnd, seqEnd); + } + } else { + ryw->setSpecialKeySpaceErrorMsg("invalid index in actor_lineage"); + throw special_keys_api_failure(); + } + } catch (Error& e) { + if (e.code() != special_keys_api_failure().code()) { + ryw->setSpecialKeySpaceErrorMsg("failed to parse key"); + throw special_keys_api_failure(); + } else { + throw e; + } + } + + if (kr.begin != kr.end && host != endRangeHost) { + // The client doesn't know about all the hosts, so a get range covering + // multiple hosts has no way of knowing which IP:port combos to use. + ryw->setSpecialKeySpaceErrorMsg("the host must remain the same on both ends of the range"); + throw special_keys_api_failure(); + } + + // Open endpoint to target process on each call. This can be optimized at + // some point... + state ProcessInterface process; + process.getInterface = RequestStream(Endpoint({ host }, WLTOKEN_PROCESS)); + ProcessInterface p = wait(retryBrokenPromise(process.getInterface, GetProcessInterfaceRequest{})); + process = p; + + ActorLineageRequest actorLineageRequest; + actorLineageRequest.waitStateStart = waitStateStart; + actorLineageRequest.waitStateEnd = waitStateEnd; + actorLineageRequest.timeStart = timeStart; + actorLineageRequest.timeEnd = timeEnd; + ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest)); + + time_t dt = 0; + int seq = -1; + for (const auto& sample : reply.samples) { + for (const auto& [waitState, data] : sample.data) { + time_t datetime = (time_t)sample.time; + seq = dt == datetime ? seq + 1 : 0; + dt = datetime; + + if (seq < seqStart) { continue; } + else if (seq >= seqEnd) { break; } + + char buf[50]; + struct tm* tm; + tm = localtime(&datetime); + size_t size = strftime(buf, 50, "%FT%T%z", tm); + std::string date(buf, size); + + std::ostringstream streamKey; + if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) { + streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("state").toString() << host.toString() + << "/" << to_string(waitState) << "/" << date; + } else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) { + streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("time").toString() << host.toString() + << "/" << date << "/" << to_string(waitState); + ; + } else { + ASSERT(false); + } + streamKey << "/" << seq; + + msgpack::object_handle oh = msgpack::unpack(data.data(), data.size()); + msgpack::object deserialized = oh.get(); + + std::ostringstream stream; + stream << deserialized; + + result.push_back_deep(result.arena(), KeyValueRef(streamKey.str(), stream.str())); + } + } + + return result; +} + +Future ActorLineageImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + return actorLineageGetRangeActor(ryw, getKeyRange().begin, kr); +} + +namespace { +std::string_view to_string_view(StringRef sr) { + return std::string_view(reinterpret_cast(sr.begin()), sr.size()); +} +} // namespace + +ActorProfilerConf::ActorProfilerConf(KeyRangeRef kr) + : SpecialKeyRangeRWImpl(kr), config(ProfilerConfig::instance().getConfig()) {} + +Future ActorProfilerConf::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + RangeResult res; + std::string_view begin(to_string_view(kr.begin.removePrefix(range.begin))), + end(to_string_view(kr.end.removePrefix(range.begin))); + for (auto& p : config) { + if (p.first > end) { + break; + } else if (p.first > begin) { + KeyValueRef kv; + kv.key = StringRef(res.arena(), p.first); + kv.value = StringRef(res.arena(), p.second); + res.push_back(res.arena(), kv); + } + } + return res; +} + +void ActorProfilerConf::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + config[key.removePrefix(range.begin).toString()] = value.toString(); + didWrite = true; +} + +void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& kr) { + std::string begin(kr.begin.removePrefix(range.begin).toString()), end(kr.end.removePrefix(range.begin).toString()); + auto first = config.lower_bound(begin); + if (first == config.end()) { + // nothing to clear + return; + } + didWrite = true; + auto last = config.upper_bound(end); + config.erase(first, last); +} + +void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + std::string k = key.removePrefix(range.begin).toString(); + auto iter = config.find(k); + if (iter != config.end()) { + config.erase(iter); + } + didWrite = true; +} + +Future> ActorProfilerConf::commit(ReadYourWritesTransaction* ryw) { + Optional res{}; + try { + if (didWrite) { + ProfilerConfig::instance().reset(config); + } + return res; + } catch (ConfigError& err) { + return Optional{ err.description }; + } +} + MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} // Used to read the healthZoneKey diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 084135bfb6..9bf6bb7109 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -140,6 +140,8 @@ public: class SpecialKeySpace { public: enum class MODULE { + ACTORLINEAGE, // Sampling data + ACTOR_PROFILER_CONF, // profiler configuration CLUSTERFILEPATH, CONFIGURATION, // Configuration of the cluster CONNECTIONSTRING, @@ -197,6 +199,12 @@ public: static KeyRef getManagementApiCommandPrefix(const std::string& command) { return managementApiCommandToRange.at(command).begin; } + static KeyRangeRef getActorLineageApiCommandRange(const std::string& command) { + return actorLineageApiCommandToRange.at(command); + } + static KeyRef getActorLineageApiCommandPrefix(const std::string& command) { + return actorLineageApiCommandToRange.at(command).begin; + } static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option); static const std::set& getManagementApiOptionsSet() { return options; } static const std::set& getTracingOptions() { return tracingOptions; } @@ -225,6 +233,7 @@ private: static std::unordered_map moduleToBoundary; static std::unordered_map managementApiCommandToRange; // management command to its special keys' range + static std::unordered_map actorLineageApiCommandToRange; static std::set options; // "/