Merge pull request #4802 from sfc-gh-ljoswiak/revert/actor-lineage

Revert "Merge pull request #4136 from sfc-gh-mpilman/features/actor-l…
This commit is contained in:
Lukas Joswiak 2021-05-11 18:50:42 -07:00 committed by GitHub
commit 6499fa178e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 142 additions and 2898 deletions

View File

@ -152,7 +152,6 @@ if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
endif()
include(CompileBoost)
include(GetMsgpack)
add_subdirectory(flow)
add_subdirectory(fdbrpc)
add_subdirectory(fdbclient)

View File

@ -1,20 +0,0 @@
find_package(msgpack 3.3.0 EXACT QUIET CONFIG)
add_library(msgpack INTERFACE)
if(msgpack_FOUND)
target_link_libraries(msgpack INTERFACE msgpackc-cxx)
else()
include(ExternalProject)
ExternalProject_add(msgpackProject
URL "https://github.com/msgpack/msgpack-c/releases/download/cpp-3.3.0/msgpack-3.3.0.tar.gz"
URL_HASH SHA256=6e114d12a5ddb8cb11f669f83f32246e484a8addd0ce93f274996f1941c1f07b
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
)
ExternalProject_Get_property(msgpackProject SOURCE_DIR)
target_include_directories(msgpack SYSTEM INTERFACE "${SOURCE_DIR}/include")
add_dependencies(msgpack msgpackProject)
endif()

View File

@ -1,381 +0,0 @@
/*
* ActorLineageProfiler.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/flow.h"
#include "flow/singleton.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbclient/ActorLineageProfiler.h"
#include <msgpack.hpp>
#include <memory>
#include <boost/endian/conversion.hpp>
#include <boost/asio.hpp>
using namespace std::literals;
class Packer : public msgpack::packer<msgpack::sbuffer> {
struct visitor_t {
using VisitorMap = std::unordered_map<std::type_index, std::function<void(std::any const&, Packer& packer)>>;
VisitorMap visitorMap;
template <class T>
static void any_visitor(std::any const& val, Packer& packer) {
const T& v = std::any_cast<const T&>(val);
packer.pack(v);
}
template <class... Args>
struct populate_visitor_map;
template <class Head, class... Tail>
struct populate_visitor_map<Head, Tail...> {
static void populate(VisitorMap& map) {
map.emplace(std::type_index(typeid(Head)), any_visitor<Head>);
populate_visitor_map<Tail...>::populate(map);
}
};
template <class Head>
struct populate_visitor_map<Head> {
static void populate(VisitorMap&) {}
};
visitor_t() {
populate_visitor_map<int64_t,
uint64_t,
bool,
float,
double,
std::string,
std::string_view,
std::vector<std::any>,
std::map<std::string, std::any>,
std::map<std::string_view, std::any>,
std::vector<std::map<std::string_view, std::any>>>::populate(visitorMap);
}
void visit(const std::any& val, Packer& packer) {
auto iter = visitorMap.find(val.type());
if (iter == visitorMap.end()) {
TraceEvent(SevError, "PackerTypeNotFound").detail("Type", val.type().name());
} else {
iter->second(val, packer);
}
}
};
msgpack::sbuffer sbuffer;
// Initializing visitor_t involves building a type-map. As this is a relatively expensive operation, we don't want
// to do this each time we create a Packer object. So visitor_t is a stateless class and we only use it as a
// visitor.
crossbow::singleton<visitor_t> visitor;
public:
Packer() : msgpack::packer<msgpack::sbuffer>(sbuffer) {}
void pack(std::any const& val) { visitor->visit(val, *this); }
void pack(bool val) {
if (val) {
pack_true();
} else {
pack_false();
}
}
void pack(uint64_t val) {
if (val <= std::numeric_limits<uint8_t>::max()) {
pack_uint8(uint8_t(val));
} else if (val <= std::numeric_limits<uint16_t>::max()) {
pack_uint16(uint16_t(val));
} else if (val <= std::numeric_limits<uint32_t>::max()) {
pack_uint32(uint32_t(val));
} else {
pack_uint64(val);
}
}
void pack(int64_t val) {
if (val >= 0) {
this->pack(uint64_t(val));
} else if (val >= std::numeric_limits<uint8_t>::min()) {
pack_int8(int8_t(val));
} else if (val >= std::numeric_limits<int16_t>::min()) {
pack_int16(int16_t(val));
} else if (val >= std::numeric_limits<int32_t>::min()) {
pack_int32(int32_t(val));
} else if (val >= std::numeric_limits<int64_t>::min()) {
pack_int64(int64_t(val));
}
}
void pack(float val) { pack_float(val); }
void pack(double val) { pack_double(val); }
void pack(std::string const& str) {
pack_str(str.size());
pack_str_body(str.data(), str.size());
}
void pack(std::string_view val) {
pack_str(val.size());
pack_str_body(val.data(), val.size());
}
template <class K, class V>
void pack(std::map<K, V> const& map) {
pack_map(map.size());
for (const auto& p : map) {
pack(p.first);
pack(p.second);
}
}
template <class T>
void pack(std::vector<T> const& val) {
pack_array(val.size());
for (const auto& v : val) {
pack(v);
}
}
std::pair<char*, unsigned> getbuf() {
unsigned size = sbuffer.size();
return std::make_pair(sbuffer.release(), size);
}
};
IALPCollectorBase::IALPCollectorBase() {
SampleCollector::instance().addCollector(this);
}
std::map<std::string_view, std::any> SampleCollectorT::collect(ActorLineage* lineage) {
std::map<std::string_view, std::any> out;
for (auto& collector : collectors) {
auto val = collector->collect(lineage);
if (val.has_value()) {
out[collector->name()] = val.value();
}
}
return out;
}
std::shared_ptr<Sample> SampleCollectorT::collect() {
auto sample = std::make_shared<Sample>();
double time = g_network->now();
sample->time = time;
for (auto& p : getSamples) {
Packer packer;
std::vector<std::map<std::string_view, std::any>> samples;
auto sampleVec = p.second();
for (auto& val : sampleVec) {
auto m = collect(val.getPtr());
if (!m.empty()) {
samples.emplace_back(std::move(m));
}
}
if (!samples.empty()) {
packer.pack(samples);
sample->data[p.first] = packer.getbuf();
}
}
return sample;
}
void SampleCollection_t::refresh() {
auto sample = _collector->collect();
auto min = std::min(sample->time - windowSize, sample->time);
{
Lock _{ mutex };
data.emplace_back(std::move(sample));
}
double oldest = data.front()->time;
// we don't need to check for data.empty() in this loop (or the inner loop) as we know that we will end
// up with at least one entry which is the most recent sample
while (oldest < min) {
Lock _{ mutex };
// we remove at most 10 elements at a time. This is so we don't block the main thread for too long.
for (int i = 0; i < 10 && oldest < min; ++i) {
data.pop_front();
oldest = data.front()->time;
}
}
//config->ingest(sample);
}
std::vector<std::shared_ptr<Sample>> SampleCollection_t::get(double from /*= 0.0*/,
double to /*= std::numeric_limits<double>::max()*/) const {
Lock _{ mutex };
std::vector<std::shared_ptr<Sample>> res;
for (const auto& sample : data) {
if (sample->time > to) {
break;
} else if (sample->time >= from) {
res.push_back(sample);
}
}
return res;
}
struct ProfilerImpl {
boost::asio::io_context context;
boost::asio::executor_work_guard<decltype(context.get_executor())> workGuard;
boost::asio::steady_timer timer;
std::thread mainThread;
unsigned frequency;
SampleCollection collection;
ProfilerImpl() : workGuard(context.get_executor()), timer(context) {
mainThread = std::thread([this]() { context.run(); });
}
~ProfilerImpl() {
setFrequency(0);
workGuard.reset();
mainThread.join();
}
void profileHandler(boost::system::error_code const& ec) {
if (ec) {
return;
}
collection->refresh();
timer = boost::asio::steady_timer(context, std::chrono::microseconds(1000000 / frequency));
timer.async_wait([this](auto const& ec) { profileHandler(ec); });
}
void setFrequency(unsigned frequency) {
boost::asio::post(context, [this, frequency]() {
this->frequency = frequency;
timer.cancel();
if (frequency > 0) {
profileHandler(boost::system::error_code{});
}
});
}
};
ActorLineageProfilerT::ActorLineageProfilerT() : impl(new ProfilerImpl()) {
collection->collector()->addGetter(WaitState::Network,
std::bind(&ActorLineageSet::copy, std::ref(g_network->getActorLineageSet())));
collection->collector()->addGetter(
WaitState::Disk,
std::bind(&ActorLineageSet::copy, std::ref(IAsyncFileSystem::filesystem()->getActorLineageSet())));
collection->collector()->addGetter(WaitState::Running, []() {
auto res = currentLineageThreadSafe.get();
if (res.isValid()) {
return std::vector<Reference<ActorLineage>>({ res });
}
return std::vector<Reference<ActorLineage>>();
});
}
ActorLineageProfilerT::~ActorLineageProfilerT() {
delete impl;
}
void ActorLineageProfilerT::setFrequency(unsigned frequency) {
impl->setFrequency(frequency);
}
boost::asio::io_context& ActorLineageProfilerT::context() {
return impl->context;
}
SampleIngestor::~SampleIngestor() {}
// Callback used to update the sampling profilers run frequency whenever the
// frequency changes.
void samplingProfilerUpdateFrequency(std::optional<std::any> freq) {
double frequency = 0;
if (freq.has_value()) {
frequency = std::any_cast<double>(freq.value());
}
TraceEvent(SevInfo, "SamplingProfilerUpdateFrequency").detail("Frequency", frequency);
ActorLineageProfiler::instance().setFrequency(frequency);
}
void ProfilerConfigT::reset(std::map<std::string, std::string> const& config) {
bool expectNoMore = false, useFluentD = false, useTCP = false;
std::string endpoint;
ConfigError err;
for (auto& kv : config) {
if (expectNoMore) {
err.description = format("Unexpected option %s", kv.first.c_str());
throw err;
}
if (kv.first == "collector") {
std::string val = kv.second;
std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); });
if (val == "none") {
setBackend(std::make_shared<NoneIngestor>());
} else if (val == "fluentd") {
useFluentD = true;
} else {
err.description = format("Unsupported collector: %s", val.c_str());
throw err;
}
} else if (kv.first == "collector_endpoint") {
endpoint = kv.second;
} else if (kv.first == "collector_protocol") {
auto val = kv.second;
std::for_each(val.begin(), val.end(), [](auto c) { return std::tolower(c); });
if (val == "tcp") {
useTCP = true;
} else if (val == "udp") {
useTCP = false;
} else {
err.description = format("Unsupported protocol for fluentd: %s", kv.second.c_str());
throw err;
}
} else {
err.description = format("Unknown option %s", kv.first.c_str());
throw err;
}
}
if (useFluentD) {
if (endpoint.empty()) {
err.description = "Endpoint is required for fluentd ingestor";
throw err;
}
NetworkAddress address;
try {
address = NetworkAddress::parse(endpoint);
} catch (Error& e) {
err.description = format("Can't parse address %s", endpoint.c_str());
throw err;
}
setBackend(std::make_shared<FluentDIngestor>(
useTCP ? FluentDIngestor::Protocol::TCP : FluentDIngestor::Protocol::TCP, address));
}
}
std::map<std::string, std::string> ProfilerConfigT::getConfig() const {
std::map<std::string, std::string> res;
if (ingestor) {
ingestor->getConfig(res);
}
return res;
}
// Callback used to update the sample collector window size.
void samplingProfilerUpdateWindow(std::optional<std::any> window) {
double duration = 0;
if (window.has_value()) {
duration = std::any_cast<double>(window.value());
}
TraceEvent(SevInfo, "SamplingProfilerUpdateWindow").detail("Duration", duration);
SampleCollection::instance().setWindowSize(duration);
}

View File

@ -1,192 +0,0 @@
/*
* ActorLineageProfiler.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/AnnotateActor.h"
#include <optional>
#include <string>
#include <any>
#include <vector>
#include <mutex>
#include <condition_variable>
#include "flow/singleton.h"
#include "flow/flow.h"
void samplingProfilerUpdateFrequency(std::optional<std::any> freq);
void samplingProfilerUpdateWindow(std::optional<std::any> window);
struct IALPCollectorBase {
virtual std::optional<std::any> collect(ActorLineage*) = 0;
virtual const std::string_view& name() = 0;
IALPCollectorBase();
};
template <class T>
struct IALPCollector : IALPCollectorBase {
const std::string_view& name() override { return T::name; }
};
struct Sample : std::enable_shared_from_this<Sample> {
double time = 0.0;
Sample() {}
Sample(Sample const&) = delete;
Sample& operator=(Sample const&) = delete;
std::unordered_map<WaitState, std::pair<char*, unsigned>> data;
~Sample() {
std::for_each(data.begin(), data.end(), [](std::pair<WaitState, std::pair<char*, unsigned>> entry) {
::free(entry.second.first);
});
}
};
class SampleIngestor : std::enable_shared_from_this<SampleIngestor> {
public:
virtual ~SampleIngestor();
virtual void ingest(std::shared_ptr<Sample> const& sample) = 0;
virtual void getConfig(std::map<std::string, std::string>&) const = 0;
};
class NoneIngestor : public SampleIngestor {
public:
void ingest(std::shared_ptr<Sample> const& sample) override {}
void getConfig(std::map<std::string, std::string>& res) const override { res["ingestor"] = "none"; }
};
// The FluentD ingestor uses the pimpl idiom. This is to make compilation less heavy weight as this implementation has
// dependencies to boost::asio
struct FluentDIngestorImpl;
class FluentDIngestor : public SampleIngestor {
public: // Public Types
enum class Protocol { TCP, UDP };
private: // members
FluentDIngestorImpl* impl;
public: // interface
void ingest(std::shared_ptr<Sample> const& sample) override;
FluentDIngestor(Protocol protocol, NetworkAddress& endpoint);
void getConfig(std::map<std::string, std::string>& res) const override;
~FluentDIngestor();
};
struct ConfigError {
std::string description;
};
class ProfilerConfigT {
private: // private types
using Lock = std::unique_lock<std::mutex>;
friend class crossbow::create_static<ProfilerConfigT>;
private: // members
std::shared_ptr<SampleIngestor> ingestor = std::make_shared<NoneIngestor>();
private: // construction
ProfilerConfigT() {}
ProfilerConfigT(ProfilerConfigT const&) = delete;
ProfilerConfigT& operator=(ProfilerConfigT const&) = delete;
void setBackend(std::shared_ptr<SampleIngestor> ingestor) { this->ingestor = ingestor; }
public:
void reset(std::map<std::string, std::string> const& config);
std::map<std::string, std::string> getConfig() const;
};
using ProfilerConfig = crossbow::singleton<ProfilerConfigT>;
class SampleCollectorT {
public: // Types
friend struct crossbow::create_static<SampleCollectorT>;
using Getter = std::function<std::vector<Reference<ActorLineage>>()>;
private:
std::vector<IALPCollectorBase*> collectors;
std::map<WaitState, Getter> getSamples;
SampleCollectorT() {}
std::map<std::string_view, std::any> collect(ActorLineage* lineage);
public:
void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); }
std::shared_ptr<Sample> collect();
void addGetter(WaitState waitState, Getter const& getter) { getSamples[waitState] = getter; };
};
using SampleCollector = crossbow::singleton<SampleCollectorT>;
class SampleCollection_t {
friend struct crossbow::create_static<SampleCollection_t>;
using Lock = std::unique_lock<std::mutex>;
SampleCollection_t() {}
SampleCollector _collector;
mutable std::mutex mutex;
std::atomic<double> windowSize = 0.0;
std::deque<std::shared_ptr<Sample>> data;
ProfilerConfig config;
public:
/**
* Define how many samples the collection shoul keep. The window size is defined by time dimension.
*
* \param duration How long a sample should be kept in the collection.
*/
void setWindowSize(double duration) { windowSize.store(duration); }
/**
* By default returns reference counted pointers of all samples. A window can be defined in terms of absolute time.
*
* \param from The minimal age of all returned samples.
* \param to The max age of all returned samples.
*/
std::vector<std::shared_ptr<Sample>> get(double from = 0.0, double to = std::numeric_limits<double>::max()) const;
/**
* Collects all new samples from the sample collector and stores them in the collection.
*/
void refresh();
const SampleCollector& collector() const { return _collector; }
SampleCollector& collector() { return _collector; }
};
using SampleCollection = crossbow::singleton<SampleCollection_t>;
struct ProfilerImpl;
namespace boost {
namespace asio {
// forward declare io_context because including boost asio is super expensive
class io_context;
} // namespace asio
} // namespace boost
class ActorLineageProfilerT {
friend struct crossbow::create_static<ActorLineageProfilerT>;
ProfilerImpl* impl;
SampleCollection collection;
ActorLineageProfilerT();
public:
~ActorLineageProfilerT();
void setFrequency(unsigned frequency);
boost::asio::io_context& context();
};
using ActorLineageProfiler = crossbow::singleton<ActorLineageProfilerT>;

View File

@ -1,23 +0,0 @@
/*
* AnnotateActor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/AnnotateActor.h"
std::map<WaitState, std::function<std::vector<Reference<ActorLineage>>()>> samples;

View File

@ -1,85 +0,0 @@
/*
* AnnotateActor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "flow/flow.h"
#include "flow/network.h"
#include <string_view>
// Used to manually instrument waiting actors to collect samples for the
// sampling profiler.
struct AnnotateActor {
unsigned index;
bool set;
AnnotateActor() : set(false) {}
AnnotateActor(Reference<ActorLineage> lineage) : set(true) {
index = g_network->getActorLineageSet().insert(lineage);
if (index == ActorLineageSet::npos) {
set = false;
}
}
AnnotateActor(const AnnotateActor& other) = delete;
AnnotateActor(AnnotateActor&& other) = delete;
AnnotateActor& operator=(const AnnotateActor& other) = delete;
AnnotateActor& operator=(AnnotateActor&& other) {
if (this == &other) {
return *this;
}
this->index = other.index;
this->set = other.set;
other.set = false;
return *this;
}
~AnnotateActor() {
if (set) {
g_network->getActorLineageSet().erase(index);
}
}
};
enum class WaitState { Disk, Network, Running };
// usually we shouldn't use `using namespace` in a header file, but literals should be safe as user defined literals
// need to be prefixed with `_`
using namespace std::literals;
constexpr std::string_view to_string(WaitState st) {
switch (st) {
case WaitState::Disk:
return "Disk"sv;
case WaitState::Network:
return "Network"sv;
case WaitState::Running:
return "Running"sv;
default:
return ""sv;
}
}
extern std::map<WaitState, std::function<std::vector<Reference<ActorLineage>>()>> samples;

View File

@ -1,7 +1,4 @@
set(FDBCLIENT_SRCS
ActorLineageProfiler.h
ActorLineageProfiler.cpp
AnnotateActor.cpp
AsyncFileS3BlobStore.actor.cpp
AsyncFileS3BlobStore.actor.h
AsyncTaskThread.actor.cpp
@ -30,7 +27,6 @@ set(FDBCLIENT_SRCS
EventTypes.actor.h
FDBOptions.h
FDBTypes.h
FluentDSampleIngestor.cpp
FileBackupAgent.actor.cpp
GlobalConfig.h
GlobalConfig.actor.h
@ -144,7 +140,8 @@ endif()
add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc msgpack)
if(BUILD_AZURE_BACKUP)
target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite)
target_link_libraries(fdbclient PUBLIC fdbrpc PRIVATE curl uuid azure-storage-lite)
else()
target_link_libraries(fdbclient PUBLIC fdbrpc)
endif()

View File

@ -1,257 +0,0 @@
/*
* FluentDSampleIngestor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/ActorLineageProfiler.h"
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <msgpack.hpp>
namespace {
boost::asio::ip::address ipAddress(IPAddress const& n) {
if (n.isV6()) {
return boost::asio::ip::address_v6(n.toV6());
} else {
return boost::asio::ip::address_v4(n.toV4());
}
}
template <class Protocol>
boost::asio::ip::basic_endpoint<Protocol> toEndpoint(NetworkAddress const n) {
return boost::asio::ip::basic_endpoint<Protocol>(ipAddress(n.ip), n.port);
}
struct FluentDSocket {
virtual ~FluentDSocket() {}
virtual void connect(NetworkAddress const& endpoint) = 0;
virtual void send(std::shared_ptr<Sample> const& sample) = 0;
virtual const boost::system::error_code& failed() const = 0;
};
template <class Protocol, class Callback>
class SampleSender : public std::enable_shared_from_this<SampleSender<Protocol, Callback>> {
using Socket = typename Protocol::socket;
using Iter = typename decltype(Sample::data)::iterator;
Socket& socket;
Callback callback;
Iter iter, end;
struct Buf {
const char* data;
const unsigned size;
Buf(const char* data, unsigned size) : data(data), size(size) {}
Buf(Buf const&) = delete;
Buf& operator=(Buf const&) = delete;
~Buf() { delete[] data; }
};
void sendCompletionHandler(boost::system::error_code const& ec) {
if (ec) {
callback(ec);
} else {
++iter;
sendNext();
}
}
void send(boost::asio::ip::tcp::socket& socket, std::shared_ptr<Buf> const& buf) {
boost::asio::async_write(
socket,
boost::asio::const_buffer(buf->data, buf->size),
[buf, self = this->shared_from_this()](auto const& ec, size_t) { self->sendCompletionHandler(ec); });
}
void send(boost::asio::ip::udp::socket& socket, std::shared_ptr<Buf> const& buf) {
socket.async_send(
boost::asio::const_buffer(buf->data, buf->size),
[buf, self = this->shared_from_this()](auto const& ec, size_t) { self->sendCompletionHandler(ec); });
}
void sendNext() {
if (iter == end) {
callback(boost::system::error_code());
}
// 1. calculate size of buffer
unsigned size = 1; // 1 for fixmap identifier byte
auto waitState = to_string(iter->first);
if (waitState.size() < 32) {
size = waitState.size() + 1;
} else {
size = waitState.size() + 2;
}
size += iter->second.second;
// 2. allocate the buffer
std::unique_ptr<char[]> buf(new char[size]);
unsigned off = 0;
// 3. serialize fixmap
buf[off++] = 0x81; // map of size 1
// 3.1 serialize key
if (waitState.size() < 32) {
buf[off++] = 0xa0 + waitState.size(); // fixstr
} else {
buf[off++] = 0xd9;
buf[off++] = char(waitState.size());
}
memcpy(buf.get() + off, waitState.data(), waitState.size());
off += waitState.size();
// 3.2 append serialized value
memcpy(buf.get() + off, iter->second.first, iter->second.second);
// 4. send the result to fluentd
send(socket, std::make_shared<Buf>(buf.release(), size));
}
public:
SampleSender(Socket& socket, Callback const& callback, std::shared_ptr<Sample> const& sample)
: socket(socket), callback(callback), iter(sample->data.begin()), end(sample->data.end()) {
sendNext();
}
};
// Sample function to make instanciation of SampleSender easier
template <class Protocol, class Callback>
std::shared_ptr<SampleSender<Protocol, Callback>> makeSampleSender(typename Protocol::socket& socket, Callback const& callback, std::shared_ptr<Sample> const& sample) {
return std::make_shared<SampleSender<Protocol, Callback>>(socket, callback, sample);
}
template <class Protocol>
struct FluentDSocketImpl : FluentDSocket, std::enable_shared_from_this<FluentDSocketImpl<Protocol>> {
static constexpr unsigned MAX_QUEUE_SIZE = 100;
boost::asio::io_context& context;
typename Protocol::socket socket;
FluentDSocketImpl(boost::asio::io_context& context) : context(context), socket(context) {}
bool ready = false;
std::deque<std::shared_ptr<Sample>> queue;
boost::system::error_code _failed;
const boost::system::error_code& failed() const override { return _failed; }
void sendCompletionHandler(boost::system::error_code const& ec) {
if (ec) {
// TODO: trace error
_failed = ec;
return;
}
if (queue.empty()) {
ready = true;
} else {
auto sample = queue.front();
queue.pop_front();
sendImpl(sample);
}
}
void sendImpl(std::shared_ptr<Sample> const& sample) {
makeSampleSender<Protocol>(socket, [self = this->shared_from_this()](boost::system::error_code const& ec){
self->sendCompletionHandler(ec);
}, sample);
}
void send(std::shared_ptr<Sample> const& sample) override {
if (_failed) {
return;
}
if (ready) {
ready = false;
sendImpl(sample);
} else {
if (queue.size() < MAX_QUEUE_SIZE) {
queue.push_back(sample);
} // TODO: else trace a warning
}
}
void connect(NetworkAddress const& endpoint) override {
auto to = toEndpoint<Protocol>(endpoint);
socket.async_connect(to, [self = this->shared_from_this()](boost::system::error_code const& ec) {
if (ec) {
// TODO: error handling
self->_failed = ec;
return;
}
self->ready = true;
});
}
};
} // namespace
struct FluentDIngestorImpl {
using Protocol = FluentDIngestor::Protocol;
Protocol protocol;
NetworkAddress endpoint;
boost::asio::io_context& io_context;
std::unique_ptr<FluentDSocket> socket;
boost::asio::steady_timer retryTimer;
FluentDIngestorImpl(Protocol protocol, NetworkAddress const& endpoint)
: protocol(protocol), endpoint(endpoint), io_context(ActorLineageProfiler::instance().context()),
retryTimer(io_context) {
connect();
}
~FluentDIngestorImpl() { retryTimer.cancel(); }
void connect() {
switch (protocol) {
case Protocol::TCP:
socket.reset(new FluentDSocketImpl<boost::asio::ip::tcp>(io_context));
break;
case Protocol::UDP:
socket.reset(new FluentDSocketImpl<boost::asio::ip::udp>(io_context));
break;
}
socket->connect(endpoint);
}
void retry() {
retryTimer = boost::asio::steady_timer(io_context, std::chrono::seconds(1));
retryTimer.async_wait([this](auto const& ec) {
if (ec) {
return;
}
connect();
});
socket.reset();
}
};
FluentDIngestor::~FluentDIngestor() {
delete impl;
}
FluentDIngestor::FluentDIngestor(Protocol protocol, NetworkAddress& endpoint)
: impl(new FluentDIngestorImpl(protocol, endpoint)) {}
void FluentDIngestor::ingest(const std::shared_ptr<Sample>& sample) {
if (!impl->socket) {
// the connection failed in the past and we wait for a timeout before we retry
return;
} else if (impl->socket->failed()) {
impl->retry();
return;
} else {
impl->socket->send(sample);
}
}
void FluentDIngestor::getConfig(std::map<std::string, std::string>& res) const {
res["ingestor"] = "fluentd";
res["collector_endpoint"] = impl->endpoint.toString();
res["collector_protocol"] = impl->protocol == Protocol::TCP ? "tcp" : "udp";
}

View File

@ -34,18 +34,14 @@ const KeyRef fdbClientInfoTxnSizeLimit = LiteralStringRef("config/fdb_client_inf
const KeyRef transactionTagSampleRate = LiteralStringRef("config/transaction_tag_sample_rate");
const KeyRef transactionTagSampleCost = LiteralStringRef("config/transaction_tag_sample_cost");
const KeyRef samplingFrequency = LiteralStringRef("visibility/sampling/frequency");
const KeyRef samplingWindow = LiteralStringRef("visibility/sampling/window");
GlobalConfig::GlobalConfig() : lastUpdate(0) {}
void GlobalConfig::create(DatabaseContext* cx, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
if (g_network->global(INetwork::enGlobalConfig) == nullptr) {
auto config = new GlobalConfig{};
config->cx = Database(cx);
config->dbInfo = dbInfo;
g_network->setGlobal(INetwork::enGlobalConfig, config);
config->_updater = updater(config);
config->_updater = updater(config, dbInfo);
}
}
@ -55,10 +51,6 @@ GlobalConfig& GlobalConfig::globalConfig() {
return *reinterpret_cast<GlobalConfig*>(res);
}
void GlobalConfig::updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo) {
// this->dbInfo = dbInfo;
}
Key GlobalConfig::prefixedKey(KeyRef key) {
return key.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::GLOBALCONFIG).begin);
}
@ -85,14 +77,6 @@ Future<Void> GlobalConfig::onInitialized() {
return initialized.getFuture();
}
Future<Void> GlobalConfig::onChange() {
return configChanged.onTrigger();
}
void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn) {
callbacks.emplace(key, std::move(fn));
}
void GlobalConfig::insert(KeyRef key, ValueRef value) {
data.erase(key);
@ -105,8 +89,6 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
any = StringRef(arena, t.getString(0).contents());
} else if (t.getType(0) == Tuple::ElementType::INT) {
any = t.getInt(0);
} else if (t.getType(0) == Tuple::ElementType::BOOL) {
any = t.getBool(0);
} else if (t.getType(0) == Tuple::ElementType::FLOAT) {
any = t.getFloat(0);
} else if (t.getType(0) == Tuple::ElementType::DOUBLE) {
@ -115,26 +97,19 @@ void GlobalConfig::insert(KeyRef key, ValueRef value) {
ASSERT(false);
}
data[stableKey] = makeReference<ConfigValue>(std::move(arena), std::move(any));
if (callbacks.find(stableKey) != callbacks.end()) {
callbacks[stableKey](data[stableKey]->value);
}
} catch (Error& e) {
TraceEvent("GlobalConfigTupleParseError").detail("What", e.what());
}
}
void GlobalConfig::erase(Key key) {
erase(KeyRangeRef(key, keyAfter(key)));
void GlobalConfig::erase(KeyRef key) {
data.erase(key);
}
void GlobalConfig::erase(KeyRangeRef range) {
auto it = data.begin();
while (it != data.end()) {
if (range.contains(it->first)) {
if (callbacks.find(it->first) != callbacks.end()) {
callbacks[it->first](std::nullopt);
}
it = data.erase(it);
} else {
++it;
@ -188,7 +163,7 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
// Updates local copy of global configuration by reading the entire key-range
// from storage.
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
self->data.clear();
Transaction tr(self->cx);
RangeResult result = wait(tr.getRange(globalConfigDataKeys, CLIENT_KNOBS->TOO_MANY));
@ -201,8 +176,7 @@ ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
// Applies updates to the local copy of the global configuration when this
// process receives an updated history.
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
// wait(self->cx->onConnected());
ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo) {
wait(self->migrate(self));
wait(self->refresh(self));
@ -210,9 +184,9 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
loop {
try {
wait(self->dbInfo->onChange());
wait(dbInfo->onChange());
auto& history = self->dbInfo->get().history;
auto& history = dbInfo->get().history;
if (history.size() == 0) {
continue;
}
@ -222,8 +196,8 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
// history updates or the protocol version changed, so it
// must re-read the entire configuration range.
wait(self->refresh(self));
if (self->dbInfo->get().history.size() > 0) {
self->lastUpdate = self->dbInfo->get().history.back().version;
if (dbInfo->get().history.size() > 0) {
self->lastUpdate = dbInfo->get().history.back().version;
}
} else {
// Apply history in order, from lowest version to highest
@ -248,8 +222,6 @@ ACTOR Future<Void> GlobalConfig::updater(GlobalConfig* self) {
self->lastUpdate = vh.version;
}
}
self->configChanged.trigger();
} catch (Error& e) {
throw;
}

View File

@ -27,9 +27,7 @@
#define FDBCLIENT_GLOBALCONFIG_ACTOR_H
#include <any>
#include <functional>
#include <map>
#include <optional>
#include <type_traits>
#include <unordered_map>
@ -51,9 +49,6 @@ extern const KeyRef fdbClientInfoTxnSizeLimit;
extern const KeyRef transactionTagSampleRate;
extern const KeyRef transactionTagSampleCost;
extern const KeyRef samplingFrequency;
extern const KeyRef samplingWindow;
// Structure used to hold the values stored by global configuration. The arena
// is used as memory to store both the key and the value (the value is only
// stored in the arena if it is an object; primitives are just copied).
@ -77,13 +72,6 @@ public:
// configuration.
static GlobalConfig& globalConfig();
// Updates the ClientDBInfo object used by global configuration to read new
// data. For server processes, this value needs to be set by the cluster
// controller, but global config is initialized before the cluster
// controller is, so this function provides a mechanism to update the
// object after initialization.
void updateDBInfo(Reference<AsyncVar<ClientDBInfo>> dbInfo);
// Use this function to turn a global configuration key defined above into
// the full path needed to set the value in the database.
//
@ -126,16 +114,6 @@ public:
// been created and is ready.
Future<Void> onInitialized();
// Triggers the returned future when any key-value pair in the global
// configuration changes.
Future<Void> onChange();
// Calls \ref fn when the value associated with \ref key is changed. \ref
// fn is passed the updated value for the key, or an empty optional if the
// key has been cleared. If the value is an allocated object, its memory
// remains in the control of the global configuration.
void trigger(KeyRef key, std::function<void(std::optional<std::any>)> fn);
private:
GlobalConfig();
@ -149,23 +127,20 @@ private:
void insert(KeyRef key, ValueRef value);
// Removes the given key (and associated value) from the local copy of the
// global configuration keyspace.
void erase(Key key);
void erase(KeyRef key);
// Removes the given key range (and associated values) from the local copy
// of the global configuration keyspace.
void erase(KeyRangeRef range);
ACTOR static Future<Void> migrate(GlobalConfig* self);
ACTOR static Future<Void> refresh(GlobalConfig* self);
ACTOR static Future<Void> updater(GlobalConfig* self);
ACTOR static Future<Void> updater(GlobalConfig* self, Reference<AsyncVar<ClientDBInfo>> dbInfo);
Database cx;
Reference<AsyncVar<ClientDBInfo>> dbInfo;
Future<Void> _updater;
Promise<Void> initialized;
AsyncTrigger configChanged;
std::unordered_map<StringRef, Reference<ConfigValue>> data;
Version lastUpdate;
std::unordered_map<KeyRef, std::function<void(std::optional<std::any>)>> callbacks;
};
#endif

View File

@ -32,8 +32,6 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/MultiInterface.h"
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/AnnotateActor.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/CoordinationInterface.h"
@ -50,7 +48,6 @@
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbclient/versions.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbrpc/Net2FileSystem.h"
@ -88,8 +85,6 @@ using std::pair;
namespace {
TransactionLineageCollector transactionLineageCollector;
template <class Interface, class Request>
Future<REPLY_TYPE(Request)> loadBalance(
DatabaseContext* ctx,
@ -963,8 +958,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
getValueCompleted.init(LiteralStringRef("NativeAPI.GetValueCompleted"));
GlobalConfig::create(this, clientInfo);
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
GlobalConfig::globalConfig().trigger(samplingWindow, samplingProfilerUpdateWindow);
monitorProxiesInfoChange = monitorProxiesChange(clientInfo, &proxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
@ -1069,14 +1062,6 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
std::make_unique<DataDistributionImpl>(
KeyRangeRef(LiteralStringRef("data_distribution/"), LiteralStringRef("data_distribution0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::ACTORLINEAGE,
SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ActorLineageImpl>(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ACTORLINEAGE)));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ActorProfilerConf>(SpecialKeySpace::getModuleRange(
SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION,
@ -2523,10 +2508,8 @@ ACTOR Future<Version> watchValue(Future<Version> version,
cx->invalidateCache(key);
wait(delay(CLIENT_KNOBS->WRONG_SHARD_SERVER_DELAY, info.taskID));
} else if (e.code() == error_code_watch_cancelled || e.code() == error_code_process_behind) {
// clang-format off
TEST(e.code() == error_code_watch_cancelled); // Too many watches on the storage server, poll for changes instead
TEST(e.code() == error_code_watch_cancelled); // Too many watches on storage server, poll for changes
TEST(e.code() == error_code_process_behind); // The storage servers are all behind
// clang-format on
wait(delay(CLIENT_KNOBS->WATCH_POLLING_TIME, info.taskID));
} else if (e.code() == error_code_timed_out) { // The storage server occasionally times out watches in case
// it was cancelled
@ -3099,7 +3082,6 @@ ACTOR Future<RangeResult> getRange(Database cx,
throw deterministicRandom()->randomChoice(
std::vector<Error>{ transaction_too_old(), future_version() });
}
state AnnotateActor annotation(currentLineage);
GetKeyValuesReply _rep =
wait(loadBalance(cx.getPtr(),
beginServer.second,

View File

@ -1,81 +0,0 @@
/*
* ProcessInterface.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/AnnotateActor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
constexpr UID WLTOKEN_PROCESS(-1, 11);
struct ProcessInterface {
constexpr static FileIdentifier file_identifier = 985636;
RequestStream<struct GetProcessInterfaceRequest> getInterface;
RequestStream<struct ActorLineageRequest> actorLineage;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, actorLineage);
}
};
struct GetProcessInterfaceRequest {
constexpr static FileIdentifier file_identifier = 7632546;
ReplyPromise<ProcessInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
// This type is used to send serialized sample data over the network.
struct SerializedSample {
constexpr static FileIdentifier file_identifier = 15785634;
double time;
std::unordered_map<WaitState, std::string> data;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, time, data);
}
};
struct ActorLineageReply {
constexpr static FileIdentifier file_identifier = 1887656;
std::vector<SerializedSample> samples;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, samples);
}
};
struct ActorLineageRequest {
constexpr static FileIdentifier file_identifier = 11654765;
WaitState waitStateStart, waitStateEnd;
time_t timeStart, timeEnd;
ReplyPromise<ActorLineageReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, waitStateStart, waitStateEnd, timeStart, timeEnd, reply);
}
};

View File

@ -21,14 +21,6 @@
#include "boost/lexical_cast.hpp"
#include "boost/algorithm/string.hpp"
#include <time.h>
#include <msgpack.hpp>
#include <exception>
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "flow/Arena.h"
@ -75,12 +67,7 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
{ SpecialKeySpace::MODULE::GLOBALCONFIG,
KeyRangeRef(LiteralStringRef("\xff\xff/global_config/"), LiteralStringRef("\xff\xff/global_config0")) },
{ SpecialKeySpace::MODULE::TRACING,
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) },
{ SpecialKeySpace::MODULE::ACTORLINEAGE,
KeyRangeRef(LiteralStringRef("\xff\xff/actor_lineage/"), LiteralStringRef("\xff\xff/actor_lineage0")) },
{ SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF,
KeyRangeRef(LiteralStringRef("\xff\xff/actor_profiler_conf/"),
LiteralStringRef("\xff\xff/actor_profiler_conf0")) }
KeyRangeRef(LiteralStringRef("\xff\xff/tracing/"), LiteralStringRef("\xff\xff/tracing0")) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
@ -111,15 +98,6 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiCommandToRange = {
{ "state",
KeyRangeRef(LiteralStringRef("state/"), LiteralStringRef("state0"))
.withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) },
{ "time",
KeyRangeRef(LiteralStringRef("time/"), LiteralStringRef("time0"))
.withPrefix(moduleToBoundary[MODULE::ACTORLINEAGE].begin) }
};
std::set<std::string> SpecialKeySpace::options = { "excluded/force", "failed/force" };
std::set<std::string> SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey };
@ -1406,9 +1384,6 @@ Future<RangeResult> GlobalConfigImpl::getRange(ReadYourWritesTransaction* ryw, K
} else if (config->value.type() == typeid(int64_t)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<int64_t>(config->value))));
} else if (config->value.type() == typeid(bool)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<bool>(config->value))));
} else if (config->value.type() == typeid(float)) {
result.push_back_deep(result.arena(),
KeyValueRef(prefixedKey, std::to_string(std::any_cast<float>(config->value))));
@ -1930,272 +1905,6 @@ void ClientProfilingImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& ke
"Clear operation is forbidden for profile client. You can set it to default to disable profiling.");
}
ActorLineageImpl::ActorLineageImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
void parse(StringRef& val, int& i) {
i = std::stoi(val.toString());
}
void parse(StringRef& val, double& d) {
d = std::stod(val.toString());
}
void parse(StringRef& val, WaitState& w) {
if (val == LiteralStringRef("disk")) {
w = WaitState::Disk;
} else if (val == LiteralStringRef("network")) {
w = WaitState::Network;
} else if (val == LiteralStringRef("running")) {
w = WaitState::Running;
} else {
throw std::range_error("failed to parse run state");
}
}
void parse(StringRef& val, time_t& t) {
struct tm tm = { 0 };
if (strptime(val.toString().c_str(), "%FT%T%z", &tm) == nullptr) {
throw std::invalid_argument("failed to parse ISO 8601 datetime");
}
long timezone = tm.tm_gmtoff;
t = timegm(&tm);
if (t == -1) {
throw std::runtime_error("failed to convert ISO 8601 datetime");
}
t -= timezone;
}
void parse(StringRef& val, NetworkAddress& a) {
auto address = NetworkAddress::parse(val.toString());
if (!address.isValid()) {
throw std::invalid_argument("invalid host");
}
a = address;
}
// Base case function for parsing function below.
template <typename T>
void parse(std::vector<StringRef>::iterator it, std::vector<StringRef>::iterator end, T& t1) {
if (it == end) {
return;
}
parse(*it, t1);
}
// Given an iterator into a vector of string tokens, an iterator to the end of
// the search space in the vector (exclusive), and a list of references to
// types, parses each token in the vector into the associated type according to
// the order of the arguments.
//
// For example, given the vector ["1", "1.5", "127.0.0.1:4000"] and the
// argument list int a, double b, NetworkAddress c, after this function returns
// each parameter passed in will hold the parsed value from the token list.
//
// The appropriate parsing function must be implemented for the type you wish
// to parse. See the existing parsing functions above, and add your own if
// necessary.
template <typename T, typename... Types>
void parse(std::vector<StringRef>::iterator it, std::vector<StringRef>::iterator end, T& t1, Types&... remaining) {
// Return as soon as all tokens have been parsed. This allows parameters
// passed at the end to act as optional parameters -- they will only be set
// if the value exists.
if (it == end) {
return;
}
try {
parse(*it, t1);
parse(++it, end, remaining...);
} catch (Error& e) {
throw e;
} catch (std::exception& e) {
throw e;
}
}
ACTOR static Future<RangeResult> actorLineageGetRangeActor(ReadYourWritesTransaction* ryw,
KeyRef prefix,
KeyRangeRef kr) {
state RangeResult result;
// Set default values for all fields. The default will be used if the field
// is missing in the key.
state NetworkAddress host;
state WaitState waitStateStart = WaitState{ 0 };
state WaitState waitStateEnd = WaitState{ 2 };
state time_t timeStart = 0;
state time_t timeEnd = std::numeric_limits<time_t>::max();
state int seqStart = 0;
state int seqEnd = std::numeric_limits<int>::max();
state std::vector<StringRef> beginValues = kr.begin.removePrefix(prefix).splitAny("/"_sr);
state std::vector<StringRef> endValues = kr.end.removePrefix(prefix).splitAny("/"_sr);
// Require index (either "state" or "time") and address:port.
if (beginValues.size() < 2 || endValues.size() < 2) {
ryw->setSpecialKeySpaceErrorMsg("missing required parameters (index, host)");
throw special_keys_api_failure();
}
state NetworkAddress endRangeHost;
try {
if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) {
// For the range \xff\xff/actor_lineage/state/ip:port/wait-state/time/seq
parse(beginValues.begin() + 1, beginValues.end(), host, waitStateStart, timeStart, seqStart);
if (kr.begin != kr.end) {
parse(endValues.begin() + 1, endValues.end(), endRangeHost, waitStateEnd, timeEnd, seqEnd);
}
} else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) {
// For the range \xff\xff/actor_lineage/time/ip:port/time/wait-state/seq
parse(beginValues.begin() + 1, beginValues.end(), host, timeStart, waitStateStart, seqStart);
if (kr.begin != kr.end) {
parse(endValues.begin() + 1, endValues.end(), endRangeHost, timeEnd, waitStateEnd, seqEnd);
}
} else {
ryw->setSpecialKeySpaceErrorMsg("invalid index in actor_lineage");
throw special_keys_api_failure();
}
} catch (Error& e) {
if (e.code() != special_keys_api_failure().code()) {
ryw->setSpecialKeySpaceErrorMsg("failed to parse key");
throw special_keys_api_failure();
} else {
throw e;
}
}
if (kr.begin != kr.end && host != endRangeHost) {
// The client doesn't know about all the hosts, so a get range covering
// multiple hosts has no way of knowing which IP:port combos to use.
ryw->setSpecialKeySpaceErrorMsg("the host must remain the same on both ends of the range");
throw special_keys_api_failure();
}
// Open endpoint to target process on each call. This can be optimized at
// some point...
state ProcessInterface process;
process.getInterface = RequestStream<GetProcessInterfaceRequest>(Endpoint({ host }, WLTOKEN_PROCESS));
ProcessInterface p = wait(retryBrokenPromise(process.getInterface, GetProcessInterfaceRequest{}));
process = p;
ActorLineageRequest actorLineageRequest;
actorLineageRequest.waitStateStart = waitStateStart;
actorLineageRequest.waitStateEnd = waitStateEnd;
actorLineageRequest.timeStart = timeStart;
actorLineageRequest.timeEnd = timeEnd;
ActorLineageReply reply = wait(process.actorLineage.getReply(actorLineageRequest));
time_t dt = 0;
int seq = -1;
for (const auto& sample : reply.samples) {
for (const auto& [waitState, data] : sample.data) {
time_t datetime = (time_t)sample.time;
seq = dt == datetime ? seq + 1 : 0;
dt = datetime;
if (seq < seqStart) { continue; }
else if (seq >= seqEnd) { break; }
char buf[50];
struct tm* tm;
tm = localtime(&datetime);
size_t size = strftime(buf, 50, "%FT%T%z", tm);
std::string date(buf, size);
std::ostringstream streamKey;
if (SpecialKeySpace::getActorLineageApiCommandRange("state").contains(kr)) {
streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("state").toString() << host.toString()
<< "/" << to_string(waitState) << "/" << date;
} else if (SpecialKeySpace::getActorLineageApiCommandRange("time").contains(kr)) {
streamKey << SpecialKeySpace::getActorLineageApiCommandPrefix("time").toString() << host.toString()
<< "/" << date << "/" << to_string(waitState);
;
} else {
ASSERT(false);
}
streamKey << "/" << seq;
msgpack::object_handle oh = msgpack::unpack(data.data(), data.size());
msgpack::object deserialized = oh.get();
std::ostringstream stream;
stream << deserialized;
result.push_back_deep(result.arena(), KeyValueRef(streamKey.str(), stream.str()));
}
}
return result;
}
Future<RangeResult> ActorLineageImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return actorLineageGetRangeActor(ryw, getKeyRange().begin, kr);
}
namespace {
std::string_view to_string_view(StringRef sr) {
return std::string_view(reinterpret_cast<const char*>(sr.begin()), sr.size());
}
} // namespace
ActorProfilerConf::ActorProfilerConf(KeyRangeRef kr)
: SpecialKeyRangeRWImpl(kr), config(ProfilerConfig::instance().getConfig()) {}
Future<RangeResult> ActorProfilerConf::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
RangeResult res;
std::string_view begin(to_string_view(kr.begin.removePrefix(range.begin))),
end(to_string_view(kr.end.removePrefix(range.begin)));
for (auto& p : config) {
if (p.first > end) {
break;
} else if (p.first > begin) {
KeyValueRef kv;
kv.key = StringRef(res.arena(), p.first);
kv.value = StringRef(res.arena(), p.second);
res.push_back(res.arena(), kv);
}
}
return res;
}
void ActorProfilerConf::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
config[key.removePrefix(range.begin).toString()] = value.toString();
didWrite = true;
}
void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& kr) {
std::string begin(kr.begin.removePrefix(range.begin).toString()), end(kr.end.removePrefix(range.begin).toString());
auto first = config.lower_bound(begin);
if (first == config.end()) {
// nothing to clear
return;
}
didWrite = true;
auto last = config.upper_bound(end);
config.erase(first, last);
}
void ActorProfilerConf::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
std::string k = key.removePrefix(range.begin).toString();
auto iter = config.find(k);
if (iter != config.end()) {
config.erase(iter);
}
didWrite = true;
}
Future<Optional<std::string>> ActorProfilerConf::commit(ReadYourWritesTransaction* ryw) {
Optional<std::string> res{};
try {
if (didWrite) {
ProfilerConfig::instance().reset(config);
}
return res;
} catch (ConfigError& err) {
return Optional<std::string>{ err.description };
}
}
MaintenanceImpl::MaintenanceImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
// Used to read the healthZoneKey

View File

@ -140,8 +140,6 @@ public:
class SpecialKeySpace {
public:
enum class MODULE {
ACTORLINEAGE, // Sampling data
ACTOR_PROFILER_CONF, // profiler configuration
CLUSTERFILEPATH,
CONFIGURATION, // Configuration of the cluster
CONNECTIONSTRING,
@ -199,12 +197,6 @@ public:
static KeyRef getManagementApiCommandPrefix(const std::string& command) {
return managementApiCommandToRange.at(command).begin;
}
static KeyRangeRef getActorLineageApiCommandRange(const std::string& command) {
return actorLineageApiCommandToRange.at(command);
}
static KeyRef getActorLineageApiCommandPrefix(const std::string& command) {
return actorLineageApiCommandToRange.at(command).begin;
}
static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option);
static const std::set<std::string>& getManagementApiOptionsSet() { return options; }
static const std::set<std::string>& getTracingOptions() { return tracingOptions; }
@ -233,7 +225,6 @@ private:
static std::unordered_map<SpecialKeySpace::MODULE, KeyRange> moduleToBoundary;
static std::unordered_map<std::string, KeyRange>
managementApiCommandToRange; // management command to its special keys' range
static std::unordered_map<std::string, KeyRange> actorLineageApiCommandToRange;
static std::set<std::string> options; // "<command>/<option>"
static std::set<std::string> tracingOptions;
@ -395,32 +386,12 @@ public:
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
};
class ActorLineageImpl : public SpecialKeyRangeReadImpl {
public:
explicit ActorLineageImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class ActorProfilerConf : public SpecialKeyRangeRWImpl {
bool didWrite = false;
std::map<std::string, std::string> config;
public:
explicit ActorProfilerConf(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class MaintenanceImpl : public SpecialKeyRangeRWImpl {
public:
explicit MaintenanceImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class DataDistributionImpl : public SpecialKeyRangeRWImpl {
public:
explicit DataDistributionImpl(KeyRangeRef kr);

View File

@ -1,25 +0,0 @@
/*
* TransactionLineage.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/TransactionLineage.h"
namespace {
TransactionLineageCollector transactionLineageCollector;
}

View File

@ -1,128 +0,0 @@
/*
* TransactionLineage.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/ActorLineageProfiler.h"
struct TransactionLineage : LineageProperties<TransactionLineage> {
enum class Operation {
Unset,
GetValue,
GetKey,
GetKeyValues,
WatchValue,
GetConsistentReadVersion,
Commit,
GetKeyServersLocations
};
static constexpr std::string_view name = "Transaction"sv;
uint64_t txID;
Operation operation = Operation::Unset;
bool isSet(uint64_t TransactionLineage::*member) const { return this->*member > 0; }
bool isSet(Operation TransactionLineage::*member) const { return this->*member != Operation::Unset; }
};
struct TransactionLineageCollector : IALPCollector<TransactionLineage> {
using Operation = TransactionLineage::Operation;
std::optional<std::any> collect(ActorLineage* lineage) {
std::map<std::string_view, std::any> res;
auto txID = lineage->get(&TransactionLineage::txID);
if (txID.has_value()) {
res["ID"sv] = txID.value();
}
auto operation = lineage->get(&TransactionLineage::operation);
if (operation.has_value()) {
switch (operation.value()) {
case Operation::Unset:
res["operation"sv] = "Unset"sv;
break;
case Operation::GetValue:
res["operation"sv] = "GetValue"sv;
break;
case Operation::GetKey:
res["operation"sv] = "GetKey"sv;
break;
case Operation::GetKeyValues:
res["operation"sv] = "GetKeyValues"sv;
break;
case Operation::WatchValue:
res["operation"sv] = "WatchValue"sv;
break;
case Operation::GetConsistentReadVersion:
res["operation"sv] = "GetConsistentReadVersion"sv;
break;
case Operation::Commit:
res["operation"sv] = "Commit"sv;
break;
case Operation::GetKeyServersLocations:
res["operation"sv] = "GetKeyServersLocations"sv;
break;
}
}
if (res.empty()) {
return std::optional<std::any>{};
} else {
return res;
}
}
};
template <class T, class V>
class ScopedLineage {
V before;
V T::*member;
bool valid = true;
public:
ScopedLineage(V T::*member, V const& value) : member(member) {
auto& val = currentLineage->modify(member);
before = val;
val = value;
}
~ScopedLineage() {
if (!valid) {
return;
}
currentLineage->modify(member) = before;
}
ScopedLineage(ScopedLineage<T, V>&& o) : before(std::move(o.before)), member(o.member), valid(o.valid) {
o.release();
}
ScopedLineage& operator=(ScopedLineage<T, V>&& o) {
if (valid) {
currentLineage->modify(member) = before;
}
before = std::move(o.before);
member = o.member;
valid = o.valid;
o.release();
return *this;
}
ScopedLineage(const ScopedLineage<T, V>&) = delete;
ScopedLineage& operator=(const ScopedLineage<T, V>&) = delete;
void release() { valid = false; }
};
template <class T, class V>
ScopedLineage<T, V> make_scoped_lineage(V T::*member, V const& value) {
return ScopedLineage<T, V>(member, value);
}

View File

@ -71,8 +71,6 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
i += sizeof(float) + 1;
} else if (data[i] == 0x21) {
i += sizeof(double) + 1;
} else if (data[i] == 0x26 || data[i] == 0x27) {
i += 1;
} else if (data[i] == '\x00') {
i += 1;
} else {
@ -146,16 +144,6 @@ Tuple& Tuple::append(int64_t value) {
return *this;
}
Tuple& Tuple::appendBool(bool value) {
offsets.push_back(data.size());
if (value) {
data.push_back(data.arena(), 0x27);
} else {
data.push_back(data.arena(), 0x26);
}
return *this;
}
Tuple& Tuple::appendFloat(float value) {
offsets.push_back(data.size());
float swap = bigEndianFloat(value);
@ -204,8 +192,6 @@ Tuple::ElementType Tuple::getType(size_t index) const {
return ElementType::FLOAT;
} else if (code == 0x21) {
return ElementType::DOUBLE;
} else if (code == 0x26 || code == 0x27) {
return ElementType::BOOL;
} else {
throw invalid_tuple_data_type();
}
@ -301,21 +287,6 @@ int64_t Tuple::getInt(size_t index, bool allow_incomplete) const {
}
// TODO: Combine with bindings/flow/Tuple.*. This code is copied from there.
bool Tuple::getBool(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();
}
ASSERT_LT(offsets[index], data.size());
uint8_t code = data[offsets[index]];
if (code == 0x26) {
return false;
} else if (code == 0x27) {
return true;
} else {
throw invalid_tuple_data_type();
}
}
float Tuple::getFloat(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();

View File

@ -40,7 +40,6 @@ struct Tuple {
Tuple& append(int64_t);
// There are some ambiguous append calls in fdbclient, so to make it easier
// to add append for floats and doubles, name them differently for now.
Tuple& appendBool(bool);
Tuple& appendFloat(float);
Tuple& appendDouble(double);
Tuple& appendNull();
@ -52,7 +51,7 @@ struct Tuple {
return append(t);
}
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE };
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, FLOAT, DOUBLE };
// this is number of elements, not length of data
size_t size() const { return offsets.size(); }
@ -60,7 +59,6 @@ struct Tuple {
ElementType getType(size_t index) const;
Standalone<StringRef> getString(size_t index) const;
int64_t getInt(size_t index, bool allow_incomplete = false) const;
bool getBool(size_t index) const;
float getFloat(size_t index) const;
double getDouble(size_t index) const;

View File

@ -242,12 +242,7 @@ public:
// result = map(result, [=](int r) mutable { KAIOLogBlockEvent(io, OpLogEntry::READY, r); return r; });
#endif
auto& actorLineageSet = IAsyncFileSystem::filesystem()->getActorLineageSet();
auto index = actorLineageSet.insert(currentLineage);
ASSERT(index != ActorLineageSet::npos);
Future<Void> res = success(result);
actorLineageSet.erase(index);
return res;
return success(result);
}
// TODO(alexmiller): Remove when we upgrade the dev docker image to >14.10
#ifndef FALLOC_FL_ZERO_RANGE

View File

@ -24,7 +24,6 @@
#include "flow/UnitTest.h"
#include "flow/DeterministicRandom.h"
#include "flow/IThreadPool.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/TLSConfig.actor.h"
@ -284,9 +283,6 @@ struct YieldMockNetwork final : INetwork, ReferenceCounted<YieldMockNetwork> {
static TLSConfig emptyConfig;
return emptyConfig;
}
ActorLineageSet& getActorLineageSet() override {
throw std::exception();
}
ProtocolVersion protocolVersion() override { return baseNetwork->protocolVersion(); }
};

View File

@ -334,7 +334,7 @@ ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
}
TransportData::TransportData(uint64_t transportId)
: endpoints(/*wellKnownTokenCount*/ 12), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
: endpoints(/*wellKnownTokenCount*/ 11), endpointNotFoundReceiver(endpoints), pingReceiver(endpoints),
warnAlwaysForLargePacket(true), lastIncompatibleMessage(0), transportId(transportId),
numIncompatibleConnections(0) {
degraded = makeReference<AsyncVar<bool>>(false);

View File

@ -25,7 +25,6 @@
#include <ctime>
#include "flow/flow.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/IRateControl.h"
// All outstanding operations must be cancelled before the destructor of IAsyncFile is called.
@ -119,9 +118,6 @@ public:
// Returns the time of the last modification of the file.
virtual Future<std::time_t> lastWriteTime(const std::string& filename) = 0;
// Returns the shared memory data structure used to store actor lineages.
virtual ActorLineageSet& getActorLineageSet() = 0;
static IAsyncFileSystem* filesystem() { return filesystem(g_network); }
static runCycleFuncPtr runCycleFunc() {
return reinterpret_cast<runCycleFuncPtr>(

View File

@ -71,7 +71,6 @@ struct ProcessClass {
Ratekeeper,
StorageCache,
Backup,
Worker, // used for actor lineage tracking
NoRole
};
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };

View File

@ -89,10 +89,6 @@ Future<std::time_t> Net2FileSystem::lastWriteTime(const std::string& filename) {
return Net2AsyncFile::lastWriteTime(filename);
}
ActorLineageSet& Net2FileSystem::getActorLineageSet() {
return actorLineageSet;
}
void Net2FileSystem::newFileSystem(double ioTimeout, const std::string& fileSystemPath) {
g_network->setGlobal(INetwork::enFileSystem, (flowGlobalType) new Net2FileSystem(ioTimeout, fileSystemPath));
}

View File

@ -39,8 +39,6 @@ public:
Future<Void> renameFile(std::string const& from, std::string const& to) override;
ActorLineageSet& getActorLineageSet() override;
// void init();
static void stop();
@ -54,7 +52,6 @@ public:
dev_t fileSystemDeviceId;
bool checkFileSystem;
#endif
ActorLineageSet actorLineageSet;
};
#endif

View File

@ -31,7 +31,6 @@
#include "flow/IThreadPool.h"
#include "flow/ProtocolVersion.h"
#include "flow/Util.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileNonDurable.actor.h"
@ -976,10 +975,6 @@ public:
bool checkRunnable() override { return net2->checkRunnable(); }
ActorLineageSet& getActorLineageSet() override {
return actorLineageSet;
}
void stop() override { isStopped = true; }
void addStopCallback(std::function<void()> fn) override { stopCallbacks.emplace_back(std::move(fn)); }
bool isSimulated() const override { return true; }
@ -2122,8 +2117,6 @@ public:
// Whether or not yield has returned true during the current iteration of the run loop
bool yielded;
int yield_limit; // how many more times yield may return false before next returning true
ActorLineageSet actorLineageSet;
};
class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
@ -2501,10 +2494,6 @@ Future<std::time_t> Sim2FileSystem::lastWriteTime(const std::string& filename) {
return fileWrites[filename];
}
ActorLineageSet& Sim2FileSystem::getActorLineageSet() {
return actorLineageSet;
}
void Sim2FileSystem::newFileSystem() {
g_network->setGlobal(INetwork::enFileSystem, (flowGlobalType) new Sim2FileSystem());
}

View File

@ -472,8 +472,6 @@ public:
Future<std::time_t> lastWriteTime(const std::string& filename) override;
ActorLineageSet& getActorLineageSet() override;
Future<Void> renameFile(std::string const& from, std::string const& to) override;
Sim2FileSystem() {}
@ -481,8 +479,6 @@ public:
~Sim2FileSystem() override {}
static void newFileSystem();
ActorLineageSet actorLineageSet;
};
#endif

View File

@ -85,11 +85,8 @@ set(FDBSERVER_SRCS
RestoreWorker.actor.cpp
Resolver.actor.cpp
ResolverInterface.h
RoleLineage.actor.h
RoleLineage.actor.cpp
ServerDBInfo.actor.h
ServerDBInfo.h
SigStack.cpp
SimulatedCluster.actor.cpp
SimulatedCluster.h
SkipList.cpp

View File

@ -135,9 +135,7 @@ public:
true,
TaskPriority::DefaultEndpoint,
true)) // SOMEDAY: Locality!
{
GlobalConfig::globalConfig().updateDBInfo(clientInfo);
}
{}
void setDistributor(const DataDistributorInterface& interf) {
auto newInfo = serverInfo->get();

View File

@ -28,7 +28,6 @@
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/ConflictSet.h"
@ -1397,7 +1396,6 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self,
// WARNING: this code is run at a high priority (until the first delay(0)), so it needs to do as little work as
// possible
state CommitBatch::CommitBatchContext context(self, trs, currentBatchMemBytesCount);
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::Commit;
// Active load balancing runs at a very high priority (to obtain accurate estimate of memory used by commit batches)
// so we need to downgrade here
@ -1434,8 +1432,6 @@ ACTOR Future<Void> commitBatch(ProxyCommitData* self,
ACTOR static Future<Void> doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) {
// We can't respond to these requests until we have valid txnStateStore
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations;
currentLineage->modify(&TransactionLineage::txID) = req.spanContext.first();
wait(commitData->validState.getFuture());
wait(delay(0, TaskPriority::DefaultEndpoint));
@ -1941,7 +1937,7 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
state KeyRange txnKeys = allKeys;
RangeResult UIDtoTagMap = commitData.txnStateStore->readRange(serverTagKeys).get();
state std::map<Tag, UID> tag_uid;
for (const KeyValueRef& kv : UIDtoTagMap) {
for (const KeyValueRef kv : UIDtoTagMap) {
tag_uid[decodeServerTagValue(kv.value)] = decodeServerTagKey(kv.key);
}
loop {

View File

@ -19,7 +19,6 @@
*/
#include "fdbclient/Notified.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbclient/CommitProxyInterface.h"
@ -356,11 +355,8 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>>
GrvProxyStats* stats,
GrvTransactionRateInfo* batchRateInfo,
TransactionTagMap<uint64_t>* transactionTagCounter) {
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetConsistentReadVersion;
loop choose {
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
auto lineage = make_scoped_lineage(&TransactionLineage::txID, req.spanContext.first());
// currentLineage->modify(&TransactionLineage::txID) =
// WARNING: this code is run at a high priority, so it needs to do as little work as possible
if (stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() >
SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE) {
@ -654,7 +650,6 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
state Span span;
state int64_t midShardSize = SERVER_KNOBS->MIN_SHARD_BYTES;
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetConsistentReadVersion;
addActor.send(monitorDDMetricsChanges(&midShardSize, db));
addActor.send(getRate(proxy.id(),

View File

@ -1,25 +0,0 @@
/*
* RoleLineage.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/RoleLineage.actor.h"
using namespace std::literals;
std::string_view RoleLineage::name = "RoleLineage"sv;

View File

@ -1,67 +0,0 @@
/*
* RoleLineage.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "flow/flow.h"
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_ROLE_LINEAGE_ACTOR_G_H)
#define FDBSERVER_ROLE_LINEAGE_ACTOR_G_H
#include "fdbserver/RoleLineage.actor.g.h"
#elif !defined(FDBSERVER_ROLE_LINEAGE_ACTOR_H)
#define FDBSERVER_ROLE_LINEAGE_ACTOR_H
#include "flow/singleton.h"
#include "fdbrpc/Locality.h"
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbserver/WorkerInterface.actor.h"
#include <string_view>
#include <msgpack.hpp>
#include <any>
#include "flow/actorcompiler.h" // This must be the last include
struct RoleLineage : LineageProperties<RoleLineage> {
static std::string_view name;
ProcessClass::ClusterRole role = ProcessClass::NoRole;
bool isSet(ProcessClass::ClusterRole RoleLineage::*member) const { return this->*member != ProcessClass::NoRole; }
};
struct RoleLineageCollector : IALPCollector<RoleLineage> {
RoleLineageCollector() : IALPCollector() {}
std::optional<std::any> collect(ActorLineage* lineage) override {
auto res = lineage->get(&RoleLineage::role);
if (res.has_value()) {
return Role::get(res.value()).abbreviation;
} else {
return std::optional<std::any>();
}
}
};
// creates a new root and sets the role lineage
ACTOR template <class Fun>
Future<decltype(std::declval<Fun>()())> runInRole(Fun fun, ProcessClass::ClusterRole role) {
currentLineage->makeRoot();
currentLineage->modify(&RoleLineage::role) = role;
decltype(std::declval<Fun>()()) res = wait(fun());
return res;
}
#endif

View File

@ -1,23 +0,0 @@
#include "flow/flow.h"
#include <csignal>
#include <iostream>
#include <string_view>
// This is not yet correct, as this is not async safe
// However, this should be good enough for an initial
// proof of concept.
extern "C" void stackSignalHandler(int sig) {
auto stack = getActorStackTrace();
int i = 0;
while (!stack.empty()) {
auto s = stack.back();
stack.pop_back();
std::string_view n(reinterpret_cast<const char*>(s.begin()), s.size());
std::cout << i << ": " << n << std::endl;
++i;
}
}
void setupStackSignal() {
std::signal(SIGUSR1, &stackSignalHandler);
}

View File

@ -789,41 +789,6 @@ struct Role {
std::string abbreviation;
bool includeInTraceRoles;
static const Role& get(ProcessClass::ClusterRole role) {
switch (role) {
case ProcessClass::Storage:
return STORAGE_SERVER;
case ProcessClass::TLog:
return TRANSACTION_LOG;
case ProcessClass::CommitProxy:
return COMMIT_PROXY;
case ProcessClass::GrvProxy:
return GRV_PROXY;
case ProcessClass::Master:
return MASTER;
case ProcessClass::Resolver:
return RESOLVER;
case ProcessClass::LogRouter:
return LOG_ROUTER;
case ProcessClass::ClusterController:
return CLUSTER_CONTROLLER;
case ProcessClass::DataDistributor:
return DATA_DISTRIBUTOR;
case ProcessClass::Ratekeeper:
return RATEKEEPER;
case ProcessClass::StorageCache:
return STORAGE_CACHE;
case ProcessClass::Backup:
return BACKUP;
case ProcessClass::Worker:
return WORKER;
case ProcessClass::NoRole:
default:
ASSERT(false);
throw internal_error();
}
}
bool operator==(const Role& r) const { return roleName == r.roleName; }
bool operator!=(const Role& r) const { return !(*this == r); }

View File

@ -66,9 +66,7 @@
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"
#include "flow/Tracing.h"
#include "flow/WriteOnlySet.h"
#include "flow/UnitTest.h"
#include "fdbclient/ActorLineageProfiler.h"
#if defined(__linux__) || defined(__FreeBSD__)
#include <execinfo.h>
@ -86,8 +84,6 @@
#include "flow/actorcompiler.h" // This must be the last #include.
using namespace std::literals;
// clang-format off
enum {
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_TRACER, OPT_NEWCONSOLE,
@ -95,7 +91,7 @@ enum {
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_BUILD_FLAGS, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR,
OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_PROFILER
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE
};
CSimpleOpt::SOption g_rgOptions[] = {
@ -175,10 +171,9 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_METRICSPREFIX, "--metrics_prefix", SO_REQ_SEP },
{ OPT_IO_TRUST_SECONDS, "--io_trust_seconds", SO_REQ_SEP },
{ OPT_IO_TRUST_WARN_ONLY, "--io_trust_warn_only", SO_NONE },
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
{ OPT_TRACE_FORMAT , "--trace_format", SO_REQ_SEP },
{ OPT_WHITELIST_BINPATH, "--whitelist_binpath", SO_REQ_SEP },
{ OPT_BLOB_CREDENTIAL_FILE, "--blob_credential_file", SO_REQ_SEP },
{ OPT_PROFILER, "--profiler_", SO_REQ_SEP},
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
@ -622,11 +617,6 @@ static void printUsage(const char* name, bool devhelp) {
" Machine class (valid options are storage, transaction,"
" resolution, grv_proxy, commit_proxy, master, test, unset, stateless, log, router,"
" and cluster_controller).");
printOptionUsage("--profiler_",
"Set an actor profiler option. Supported options are:\n"
" collector -- None or FluentD (FluentD requires collector_endpoint to be set)\n"
" collector_endpoint -- IP:PORT of the fluentd server\n"
" collector_protocol -- UDP or TCP (default is UDP)");
#ifndef TLS_DISABLED
printf(TLS_HELP);
#endif
@ -990,8 +980,6 @@ struct CLIOptions {
Standalone<StringRef> machineId;
UnitTestParameters testParams;
std::map<std::string, std::string> profilerConfig;
static CLIOptions parseArgs(int argc, char* argv[]) {
CLIOptions opts;
opts.parseArgsInternal(argc, argv);
@ -1065,18 +1053,6 @@ private:
knobs.push_back(std::make_pair(syn, args.OptionArg()));
break;
}
case OPT_PROFILER: {
std::string syn = args.OptionSyntax();
std::string_view key = syn;
auto prefix = "--profiler_"sv;
if (key.find(prefix) != 0) {
fprintf(stderr, "ERROR: unable to parse profiler option '%s'\n", syn.c_str());
flushAndExit(FDB_EXIT_ERROR);
}
key.remove_prefix(prefix.size());
profilerConfig.emplace(key, args.OptionArg());
break;
};
case OPT_UNITTESTPARAM: {
std::string syn = args.OptionSyntax();
if (!StringRef(syn).startsWith(LiteralStringRef("--test_"))) {
@ -1477,13 +1453,6 @@ private:
}
}
try {
ProfilerConfig::instance().reset(profilerConfig);
} catch (ConfigError& e) {
printf("Error seting up profiler: %s", e.description.c_str());
flushAndExit(FDB_EXIT_ERROR);
}
if (seedConnString.length() && seedConnFile.length()) {
fprintf(
stderr, "%s\n", "--seed_cluster_file and --seed_connection_string may not both be specified at once.");
@ -1624,9 +1593,6 @@ private:
} // namespace
int main(int argc, char* argv[]) {
// TODO: Remove later, this is just to force the statics to be initialized
// otherwise the unit test won't run
ActorLineageSet _;
try {
platformInit();

View File

@ -42,7 +42,6 @@
#include "fdbclient/Notified.h"
#include "fdbclient/StatusClient.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbclient/VersionedMap.h"
#include "fdbserver/FDBExecHelper.actor.h"
#include "fdbserver/IKeyValueStore.h"
@ -1105,7 +1104,6 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
state int64_t resultSize = 0;
Span span("SS:getValue"_loc, { req.spanContext });
span.addTag("key"_sr, req.key);
currentLineage->modify(&TransactionLineage::txID) = req.spanContext.first();
try {
++data->counters.getValueQueries;
@ -1802,7 +1800,6 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
{
state Span span("SS:getKeyValues"_loc, { req.spanContext });
state int64_t resultSize = 0;
currentLineage->modify(&TransactionLineage::txID) = req.spanContext.first();
++data->counters.getRangeQueries;
++data->counters.allQueries;
@ -1963,7 +1960,6 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
state Span span("SS:getKey"_loc, { req.spanContext });
state int64_t resultSize = 0;
currentLineage->modify(&TransactionLineage::txID) = req.spanContext.first();
++data->counters.getKeyQueries;
++data->counters.allQueries;
@ -4337,7 +4333,6 @@ ACTOR Future<Void> checkBehind(StorageServer* self) {
}
ACTOR Future<Void> serveGetValueRequests(StorageServer* self, FutureStream<GetValueRequest> getValue) {
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetValue;
loop {
GetValueRequest req = waitNext(getValue);
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
@ -4355,7 +4350,6 @@ ACTOR Future<Void> serveGetValueRequests(StorageServer* self, FutureStream<GetVa
}
ACTOR Future<Void> serveGetKeyValuesRequests(StorageServer* self, FutureStream<GetKeyValuesRequest> getKeyValues) {
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyValues;
loop {
GetKeyValuesRequest req = waitNext(getKeyValues);
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
@ -4365,7 +4359,6 @@ ACTOR Future<Void> serveGetKeyValuesRequests(StorageServer* self, FutureStream<G
}
ACTOR Future<Void> serveGetKeyRequests(StorageServer* self, FutureStream<GetKeyRequest> getKey) {
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKey;
loop {
GetKeyRequest req = waitNext(getKey);
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade
@ -4378,7 +4371,6 @@ ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
WatchValueRequest req,
PromiseStream<WatchValueRequest> stream) {
state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext });
currentLineage->modify(&TransactionLineage::txID) = req.spanContext.first();
try {
wait(success(waitForVersionNoTooOld(self, req.version)));
stream.send(req);
@ -4392,11 +4384,9 @@ ACTOR Future<Void> watchValueWaitForVersion(StorageServer* self,
ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream<WatchValueRequest> stream) {
loop {
currentLineage->modify(&TransactionLineage::txID) = 0;
state WatchValueRequest req = waitNext(stream);
state Reference<ServerWatchMetadata> metadata = self->getWatchMetadata(req.key.contents());
state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext });
currentLineage->modify(&TransactionLineage::txID) = req.spanContext.first();
if (!metadata.isValid()) { // case 1: no watch set for the current key
metadata = makeReference<ServerWatchMetadata>(req.key, req.value, req.version, req.tags, req.debugID);
@ -4470,7 +4460,6 @@ ACTOR Future<Void> serveWatchValueRequestsImpl(StorageServer* self, FutureStream
ACTOR Future<Void> serveWatchValueRequests(StorageServer* self, FutureStream<WatchValueRequest> watchValue) {
state PromiseStream<WatchValueRequest> stream;
currentLineage->modify(&TransactionLineage::operation) = TransactionLineage::Operation::WatchValue;
self->actors.add(serveWatchValueRequestsImpl(self, stream.getFuture()));
loop {

View File

@ -22,8 +22,6 @@
#include <boost/lexical_cast.hpp>
#include "fdbrpc/Locality.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/ProcessInterface.h"
#include "fdbclient/StorageServerInterface.h"
#include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h"
@ -34,7 +32,6 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/MetricLogger.actor.h"
#include "fdbserver/BackupInterface.h"
#include "fdbserver/RoleLineage.actor.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/WaitFailure.h"
@ -50,7 +47,6 @@
#include "flow/Profiler.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/network.h"
#ifdef __linux__
@ -80,10 +76,6 @@ extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store);
#define KV_STORE(filename, uid) keyValueStoreMemory(filename, uid)
#endif
namespace {
RoleLineageCollector roleLineageCollector;
}
ACTOR Future<std::vector<Endpoint>> tryDBInfoBroadcast(RequestStream<UpdateServerDBInfoRequest> stream,
UpdateServerDBInfoRequest req) {
ErrorOr<std::vector<Endpoint>> rep =
@ -1049,8 +1041,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
metricsLogger = runMetrics(openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, true, lockAware),
KeyRef(metricsPrefix));
}
GlobalConfig::globalConfig().trigger(samplingFrequency, samplingProfilerUpdateFrequency);
}
errorForwarders.add(resetAfter(degraded,
@ -1098,8 +1088,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
DiskStore s = stores[f];
// FIXME: Error handling
if (s.storedComponent == DiskStore::Storage) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
IKeyValueStore* kv =
openKVStore(s.storeType, s.filename, s.storeID, memoryLimit, false, validateDataFiles);
Future<Void> kvClosed = kv->onClosed();
@ -1144,8 +1132,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
kv);
errorForwarders.add(forwardError(errors, Role::STORAGE_SERVER, recruited.id(), f));
} else if (s.storedComponent == DiskStore::TLogData) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
std::string logQueueBasename;
const std::string filename = basename(s.filename);
if (StringRef(filename).startsWith(fileLogDataPrefix)) {
@ -1345,8 +1331,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
}
}
when(RecruitMasterRequest req = waitNext(interf.master.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Master;
MasterInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
@ -1369,8 +1353,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
req.reply.send(recruited);
}
when(InitializeDataDistributorRequest req = waitNext(interf.dataDistributor.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::DataDistributor;
DataDistributorInterface recruited(locality);
recruited.initEndpoints();
@ -1393,8 +1375,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
req.reply.send(recruited);
}
when(InitializeRatekeeperRequest req = waitNext(interf.ratekeeper.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Ratekeeper;
RatekeeperInterface recruited(locality, req.reqId);
recruited.initEndpoints();
@ -1421,8 +1401,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
}
when(InitializeBackupRequest req = waitNext(interf.backup.getFuture())) {
if (!backupWorkerCache.exists(req.reqId)) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Backup;
BackupInterface recruited(locality);
recruited.initEndpoints();
@ -1452,8 +1430,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
.detail("MinRecruitable", TLogVersion::MIN_RECRUITABLE);
req.reply.sendError(internal_error());
}
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::TLog;
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)];
@ -1507,8 +1483,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
}
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
if (!storageCache.exists(req.reqId)) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Storage;
StorageServerInterface recruited(req.interfaceId);
recruited.locality = locality;
recruited.initEndpoints();
@ -1557,8 +1531,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
forwardPromise(req.reply, storageCache.get(req.reqId));
}
when(InitializeCommitProxyRequest req = waitNext(interf.commitProxy.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::CommitProxy;
CommitProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
@ -1584,8 +1556,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
req.reply.send(recruited);
}
when(InitializeGrvProxyRequest req = waitNext(interf.grvProxy.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::GrvProxy;
GrvProxyInterface recruited;
recruited.processId = locality.processId();
recruited.provisional = false;
@ -1606,8 +1576,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
req.reply.send(recruited);
}
when(InitializeResolverRequest req = waitNext(interf.resolver.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::Resolver;
ResolverInterface recruited;
recruited.locality = locality;
recruited.initEndpoints();
@ -1625,8 +1593,6 @@ ACTOR Future<Void> workerServer(Reference<ClusterConnectionFile> connFile,
req.reply.send(recruited);
}
when(InitializeLogRouterRequest req = waitNext(interf.logRouter.getFuture())) {
LocalLineage _;
currentLineage->modify(&RoleLineage::role) = ProcessClass::ClusterRole::LogRouter;
TLogInterface recruited(locality);
recruited.initEndpoints();
@ -2034,8 +2000,6 @@ ACTOR Future<Void> monitorLeaderRemotelyWithDelayedCandidacy(
}
}
extern void setupStackSignal();
ACTOR Future<Void> serveProtocolInfo() {
state RequestStream<ProtocolInfoRequest> protocolInfo(
PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() });
@ -2046,37 +2010,6 @@ ACTOR Future<Void> serveProtocolInfo() {
}
}
// Handles requests from ProcessInterface, an interface meant for direct
// communication between the client and FDB processes.
ACTOR Future<Void> serveProcess() {
state ProcessInterface process;
process.getInterface.makeWellKnownEndpoint(WLTOKEN_PROCESS, TaskPriority::DefaultEndpoint);
loop {
choose {
when(GetProcessInterfaceRequest req = waitNext(process.getInterface.getFuture())) {
req.reply.send(process);
}
when(ActorLineageRequest req = waitNext(process.actorLineage.getFuture())) {
state SampleCollection sampleCollector;
auto samples = sampleCollector->get(req.timeStart, req.timeEnd);
std::vector<SerializedSample> serializedSamples;
for (const auto& samplePtr : samples) {
auto serialized = SerializedSample{ .time = samplePtr->time };
for (const auto& [waitState, pair] : samplePtr->data) {
if (waitState >= req.waitStateStart && waitState <= req.waitStateEnd) {
serialized.data[waitState] = std::string(pair.first, pair.second);
}
}
serializedSamples.push_back(std::move(serialized));
}
ActorLineageReply reply{ serializedSamples };
req.reply.send(reply);
}
}
}
}
ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
LocalityData localities,
ProcessClass processClass,
@ -2089,11 +2022,8 @@ ACTOR Future<Void> fdbd(Reference<ClusterConnectionFile> connFile,
std::string whitelistBinPaths) {
state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles;
setupStackSignal();
currentLineage->modify(&RoleLineage::role) = ProcessClass::Worker;
actors.push_back(serveProtocolInfo());
actors.push_back(serveProcess());
try {
ServerCoordinators coordinators(connFile);

View File

@ -69,7 +69,6 @@ set(FLOW_SRCS
TreeBenchmark.h
UnitTest.cpp
UnitTest.h
WriteOnlySet.actor.cpp
XmlTraceLogFormatter.cpp
XmlTraceLogFormatter.h
actorcompiler.h

View File

@ -204,8 +204,6 @@ public:
bool checkRunnable() override;
ActorLineageSet& getActorLineageSet() override;
bool useThreadPool;
// private:
@ -228,15 +226,11 @@ public:
TaskPriority currentTaskID;
uint64_t tasksIssued;
TDMetricCollection tdmetrics;
// we read now() from a different thread. On Intel, reading a double is atomic anyways, but on other platforms it's
// not. For portability this should be atomic
std::atomic<double> currentTime;
double currentTime;
// May be accessed off the network thread, e.g. by onMainThread
std::atomic<bool> stopped;
mutable std::map<IPAddress, bool> addressOnHostCache;
ActorLineageSet actorLineageSet;
std::atomic<bool> started;
uint64_t numYields;
@ -1389,10 +1383,6 @@ bool Net2::checkRunnable() {
return !started.exchange(true);
}
ActorLineageSet& Net2::getActorLineageSet() {
return actorLineageSet;
}
void Net2::run() {
TraceEvent::setNetworkThread();
TraceEvent("Net2Running");

View File

@ -48,10 +48,6 @@
#include "flow/UnitTest.h"
#include "flow/FaultInjection.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbclient/AnnotateActor.h"
#ifdef _WIN32
#include <windows.h>
#include <winioctl.h>

View File

@ -791,17 +791,17 @@ inline void fdb_probe_actor_exit(const char* name, unsigned long id, int index)
#include <inttypes.h>
static inline uint32_t hwCrc32cU8(unsigned int crc, unsigned char v) {
uint32_t ret;
asm volatile("crc32cb %w[r], %w[c], %w[v]" : [ r ] "=r"(ret) : [ c ] "r"(crc), [ v ] "r"(v));
asm volatile("crc32cb %w[r], %w[c], %w[v]" : [r] "=r"(ret) : [c] "r"(crc), [v] "r"(v));
return ret;
}
static inline uint32_t hwCrc32cU32(unsigned int crc, unsigned int v) {
uint32_t ret;
asm volatile("crc32cw %w[r], %w[c], %w[v]" : [ r ] "=r"(ret) : [ c ] "r"(crc), [ v ] "r"(v));
asm volatile("crc32cw %w[r], %w[c], %w[v]" : [r] "=r"(ret) : [c] "r"(crc), [v] "r"(v));
return ret;
}
static inline uint64_t hwCrc32cU64(uint64_t crc, uint64_t v) {
uint64_t ret;
asm volatile("crc32cx %w[r], %w[c], %x[v]" : [ r ] "=r"(ret) : [ c ] "r"(crc), [ v ] "r"(v));
asm volatile("crc32cx %w[r], %w[c], %x[v]" : [r] "=r"(ret) : [c] "r"(crc), [v] "r"(v));
return ret;
}
#else

View File

@ -142,8 +142,6 @@ struct Profiler {
}
void signal_handler() { // async signal safe!
static std::atomic<bool> inSigHandler = false;
if (inSigHandler.exchange(true)) { return; }
if (profilingEnabled) {
double t = timer();
output_buffer->push(*(void**)&t);
@ -152,7 +150,6 @@ struct Profiler {
output_buffer->push(addresses[i]);
output_buffer->push((void*)-1LL);
}
inSigHandler.store(false);
}
static void signal_handler_for_closure(int, siginfo_t* si, void*, void* self) { // async signal safe!

View File

@ -1,273 +0,0 @@
/*
* WriteOnlySet.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/DeterministicRandom.h"
#include "flow/WriteOnlySet.h"
#include "flow/flow.h"
#include "flow/UnitTest.h"
#include <chrono>
#include <random>
#include "flow/actorcompiler.h" // has to be last include
template <class T, class IndexType, IndexType CAPACITY>
auto WriteOnlySet<T, IndexType, CAPACITY>::insert(const Reference<T>& lineage) -> Index {
Index res;
if (!freeQueue.pop(res)) {
TraceEvent(SevWarnAlways, "NoCapacityInWriteOnlySet");
return npos;
}
ASSERT(_set[res].load() == 0);
auto ptr = reinterpret_cast<uintptr_t>(lineage.getPtr());
ASSERT((ptr % 2) == 0); // this needs to be at least 2-byte aligned
ASSERT(ptr != 0);
lineage->addref();
_set[res].store(ptr);
return res;
}
template <class T, class IndexType, IndexType CAPACITY>
bool WriteOnlySet<T, IndexType, CAPACITY>::eraseImpl(Index idx) {
while (true) {
auto ptr = _set[idx].load();
if (ptr & LOCK) {
_set[idx].store(0);
freeList.push(reinterpret_cast<T*>(ptr ^ LOCK));
return false;
} else {
if (_set[idx].compare_exchange_strong(ptr, 0)) {
reinterpret_cast<T*>(ptr)->delref();
return true;
}
}
}
}
template <class T, class IndexType, IndexType CAPACITY>
bool WriteOnlySet<T, IndexType, CAPACITY>::erase(Index idx) {
ASSERT(idx >= 0 && idx < CAPACITY);
auto res = eraseImpl(idx);
ASSERT(freeQueue.push(idx));
return res;
}
template <class T, class IndexType, IndexType CAPACITY>
bool WriteOnlySet<T, IndexType, CAPACITY>::replace(Index idx, const Reference<T>& lineage) {
auto lineagePtr = reinterpret_cast<uintptr_t>(lineage.getPtr());
if (lineage.isValid()) {
lineage->addref();
}
ASSERT((lineagePtr % 2) == 0); // this needs to be at least 2-byte aligned
while (true) {
auto ptr = _set[idx].load();
if (ptr & LOCK) {
_set[idx].store(lineagePtr);
ASSERT(freeList.push(reinterpret_cast<T*>(ptr ^ LOCK)));
return false;
} else {
if (_set[idx].compare_exchange_strong(ptr, lineagePtr)) {
if (ptr) {
reinterpret_cast<T*>(ptr)->delref();
}
return ptr != 0;
}
}
}
}
template <class T, class IndexType, IndexType CAPACITY>
WriteOnlySet<T, IndexType, CAPACITY>::WriteOnlySet() : _set(CAPACITY) {
// insert the free indexes in reverse order
for (unsigned i = CAPACITY; i > 0; --i) {
freeQueue.push(i - 1);
std::atomic_init(&_set[i - 1], uintptr_t(0));
}
}
template <class T, class IndexType, IndexType CAPACITY>
std::vector<Reference<T>> WriteOnlySet<T, IndexType, CAPACITY>::copy() {
std::vector<Reference<T>> result;
for (int i = 0; i < CAPACITY; ++i) {
auto ptr = _set[i].load();
if (ptr) {
ASSERT((ptr & LOCK) == 0); // if we lock something we need to immediately unlock after we're done copying
// We attempt lock so this won't get deleted. We will try this only once, if the other thread removed the
// object from the set between the previews lines and now, we just won't make it part of the result.
if (_set[i].compare_exchange_strong(ptr, ptr | LOCK)) {
T* entry = reinterpret_cast<T*>(ptr);
ptr |= LOCK;
entry->addref();
// we try to unlock now. If this element was removed while we incremented the refcount, the element will
// end up in the freeList, so we will decrement later.
_set[i].compare_exchange_strong(ptr, ptr ^ LOCK);
result.push_back(Reference(entry));
}
}
}
// after we're done we need to clean up all objects that contented on a lock. This won't be perfect (as some thread
// might not yet added the object to the free list), but whatever we don't get now we'll clean up in the next
// iteration
freeList.consume_all([](auto toClean) { toClean->delref(); });
return result;
}
template <class T, class IndexType>
WriteOnlyVariable<T, IndexType>::WriteOnlyVariable() : WriteOnlySet<T, IndexType, 1>() {}
template <class T, class IndexType>
Reference<T> WriteOnlyVariable<T, IndexType>::get() {
auto result = WriteOnlySet<T, IndexType, 1>::copy();
return result.size() ? result.at(0) : Reference<T>();
}
template <class T, class IndexType>
bool WriteOnlyVariable<T, IndexType>::replace(const Reference<T>& element) {
return WriteOnlySet<T, IndexType, 1>::replace(0, element);
}
// Explicit instantiation
template class WriteOnlySet<ActorLineage, unsigned, 1024>;
template class WriteOnlyVariable<ActorLineage, unsigned>;
// testing code
namespace {
// Some statistics
std::atomic<unsigned long> instanceCounter = 0;
std::atomic<unsigned long> numInserts = 0;
std::atomic<unsigned long> numErase = 0;
std::atomic<unsigned long> numLockedErase = 0;
std::atomic<unsigned long> numCopied = 0;
// A simple object that counts the number of its instances. This is used to detect memory leaks.
struct TestObject {
mutable std::atomic<unsigned> _refCount = 1;
TestObject() { instanceCounter.fetch_add(1); }
void delref() const {
if (--_refCount == 0) {
delete this;
--instanceCounter;
}
}
void addref() const { ++_refCount; }
};
using TestSet = WriteOnlySet<TestObject, unsigned, 128>;
using Clock = std::chrono::steady_clock;
// An actor that can join a set of threads in an async way.
ACTOR Future<Void> threadjoiner(std::shared_ptr<std::vector<std::thread>> threads, std::shared_ptr<TestSet> set) {
loop {
wait(delay(0.1));
for (unsigned i = 0;;) {
if (threads->size() == i) {
break;
}
auto& t = (*threads)[i];
if (t.joinable()) {
t.join();
if (i + 1 < threads->size()) {
std::swap(*threads->rbegin(), (*threads)[i]);
}
threads->pop_back();
} else {
++i;
}
}
if (threads->empty()) {
set->copy();
ASSERT(instanceCounter.load() == 0);
return Void();
}
}
}
// occasionally copy the contents of the past set.
void testCopier(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) {
auto start = Clock::now();
while (true) {
if (Clock::now() - start > runFor) {
return;
}
auto copy = set->copy();
numCopied.fetch_add(copy.size());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
// In a loop adds and removes a set of objects to the set
void writer(std::shared_ptr<TestSet> set, std::chrono::seconds runFor) {
auto start = Clock::now();
std::random_device rDev;
DeterministicRandom rnd(rDev());
while (true) {
unsigned inserts = 0, erases = 0;
if (Clock::now() - start > runFor) {
return;
}
std::vector<TestSet::Index> positions;
for (int i = 0; i < rnd.randomInt(1, 101); ++i) {
Reference<TestObject> o(new TestObject());
auto pos = set->insert(o);
if (pos == TestSet::npos) {
// could not insert -- ignore
break;
}
++inserts;
ASSERT(pos < TestSet::capacity);
positions.push_back(pos);
}
rnd.randomShuffle(positions);
for (auto p : positions) {
if (!set->erase(p)) {
++numLockedErase;
}
++erases;
}
numInserts.fetch_add(inserts);
numErase.fetch_add(erases);
ASSERT(inserts == erases);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
// This unit test creates 5 writer threads and one copier thread.
TEST_CASE("/flow/WriteOnlySet") {
if (g_network->isSimulated()) {
// This test is not deterministic, so we shouldn't run it in simulation
return Void();
}
auto set = std::make_shared<TestSet>();
auto threads = std::make_shared<std::vector<std::thread>>();
std::chrono::seconds runFor(10);
for (int i = 0; i < 5; ++i) {
threads->emplace_back([set, runFor]() { writer(set, runFor); });
}
threads->emplace_back([set, runFor]() { testCopier(set, runFor); });
wait(threadjoiner(threads, set));
TraceEvent("WriteOnlySetTestResult")
.detail("Inserts", numInserts.load())
.detail("Erases", numErase.load())
.detail("Copies", numCopied.load())
.detail("LockedErase", numLockedErase.load());
return Void();
}
} // namespace

View File

@ -1,162 +0,0 @@
/*
* WriteOnlySet.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "flow/Error.h"
#include "flow/FastRef.h"
#include "flow/Trace.h"
#include <boost/lockfree/queue.hpp>
/**
* This is a Write-Only set that supports copying the whole content. This data structure is lock-free and allows a user
* to insert and remove objects up to a given capacity (passed by a template).
*
* Template parameters:
* \param T The type to store.
* \param IndexType The type used as an index
* \param CAPACITY The maximum number of object this structure can store (if a user tries to store more, insert will
* fail gracefully)
* \pre T implements `void addref() const` and `void delref() const`
* \pre IndexType must have a copy constructor
* \pre IndexType must have a trivial assignment operator
* \pre IndexType must have a trivial destructor
* \pre IndexType can be used as an index into a std::vector
*/
template <class T, class IndexType, IndexType CAPACITY>
class WriteOnlySet {
public:
// The type we use for lookup into the set. Gets assigned during insert
using Index = IndexType;
// For now we use a fixed size capacity
constexpr static Index npos = std::numeric_limits<Index>::max();
constexpr static IndexType capacity = CAPACITY;
explicit WriteOnlySet();
WriteOnlySet(const WriteOnlySet&) = delete;
WriteOnlySet(WriteOnlySet&&) = delete;
WriteOnlySet& operator=(const WriteOnlySet&) = delete;
WriteOnlySet& operator=(WriteOnlySet&&) = delete;
/**
* Attempts to insert \p lineage into the set. This method can fail if the set is full (its size is equal to its
* capacity). Calling insert on a full set is safe but the method will return \ref npos if the operation fails.
*
* \param lineage A reference to the object the user wants to insert.
* \ret An index that can later be used to erase the value again or \ref npos if the insert failed.
* \pre lineage.getPtr() % 2 == 0 (the memory for lineage has to be at least 2 byte aligned)
*/
[[nodiscard]] Index insert(const Reference<T>& lineage);
/**
* Erases the object associated with \p idx from the set.
*
* \ret Whether the reference count was decremented. Usually the return value is only interesting for testing and
* benchmarking purposes and will in most cases be ignored. If \ref delref wasn't called, it will be called
* later. Note that at the time the return value is checked, \ref delref might already have been called.
*/
bool erase(Index idx);
/**
* Replaces the object associated with \p idx with \p lineage.
*
* \ret Whether the reference count of the replaced object was decremented. Usually the return value is only
* interesting for testing and benchmarking purposes and will in most cases be ignored. If \ref delref
* wasn't called, it will be called later. Note that at the time the return value is checked, \ref delref
* might already have been called.
*/
bool replace(Index idx, const Reference<T>& lineage);
/**
* Copies all elements that are stored in the set into a vector. This copy operation does NOT provide a snapshot of
* the data structure. The contract is weak:
* - All object that were in the set before copy is called and weren't removed until after copy returned are
* guaranteed to be in the result.
* - Any object that was inserted while copy is running might be in the result.
* - Any object that was erased while copy is running might be in the result.
*/
std::vector<Reference<T>> copy();
protected:
// the implementation of erase -- the wrapper just makes the function a bit more readable.
bool eraseImpl(Index idx);
// the last bit of a pointer within the set is used like a boolean and true means that the object is locked. Locking
// an object is only relevant for memory management. A locked pointer can still be erased from the set, but the
// erase won't call delref on the object. Instead it will push the pointer into the \ref freeList and copy will call
// delref later.
static constexpr uintptr_t LOCK = 0b1;
// The actual memory
std::vector<std::atomic<std::uintptr_t>> _set;
static_assert(std::atomic<Index>::is_always_lock_free, "Index type can't be used as a lock-free type");
static_assert(std::atomic<uintptr_t>::is_always_lock_free, "uintptr_t can't be used as a lock-free type");
// The freeQueue. On creation all indexes (0..capacity-1) are pushed into this queue. On insert one element from
// this queue is consumed and the resulting number is used as an index into the set. On erase the index is given
// back to the freeQueue.
boost::lockfree::queue<Index, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<CAPACITY>> freeQueue;
// The freeList is used for memory management. Generally copying a shared pointer can't be done in a lock-free way.
// Instead, when we copy the data structure we first copy the address, then attempt to set the last bit to 1 and
// only if that succeeds we will increment the reference count. Whenever we attempt to remove an object
// in \ref erase we remove the object from the set (using an atomic compare and swap) and only decrement the
// reference count if the last bit is 0. If it's not we'll push the pointer into this free list.
// \ref copy will consume all elements from this freeList each time it runs and decrements the refcount for each
// element.
boost::lockfree::queue<T*, boost::lockfree::fixed_sized<true>, boost::lockfree::capacity<CAPACITY>> freeList;
};
/**
* Provides a thread safe, lock-free write only variable.
*
* Template parameters:
* \param T The type to store.
* \param IndexType The type used as an index
* \pre T implements `void addref() const` and `void delref() const`
* \pre IndexType must have a copy constructor
* \pre IndexType must have a trivial assignment operator
* \pre IndexType must have a trivial destructor
* \pre IndexType can be used as an index into a std::vector
*/
template <class T, class IndexType>
class WriteOnlyVariable : private WriteOnlySet<T, IndexType, 1> {
public:
explicit WriteOnlyVariable();
/**
* Returns a copied reference to the stored variable.
*/
Reference<T> get();
/**
* Replaces the variable with \p lineage. \p lineage is permitted to be an invalid pointer.
*
* \ret Whether the reference count of the replaced object was decremented. Note that if the reference being replaced
* is invalid, this function will always return false. If \ref delref wasn't called and the reference was valid,
* it will be called later. Note that at the time the return value is checked, \ref delref might already have
* been called.
*/
bool replace(const Reference<T>& element);
};
class ActorLineage;
extern template class WriteOnlySet<ActorLineage, unsigned, 1024>;
using ActorLineageSet = WriteOnlySet<ActorLineage, unsigned, 1024>;

View File

@ -452,7 +452,6 @@ namespace actorcompiler
fullClassName,
string.Join(", ", actor.parameters.Select(p => p.name).ToArray()));
writer.WriteLine("\trestore_lineage _;");
if (actor.returnType != null)
writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType);
else
@ -1287,7 +1286,6 @@ namespace actorcompiler
constructor.WriteLine("{");
constructor.Indent(+1);
ProbeEnter(constructor, actor.name);
constructor.WriteLine("currentLineage->modify(&StackLineage::actorName) = LiteralStringRef(\"{0}\");", actor.name);
constructor.WriteLine("this->{0};", body.call());
ProbeExit(constructor, actor.name);
WriteFunction(writer, constructor, constructor.BodyText);

View File

@ -1,8 +1,108 @@
<Project Sdk="Microsoft.NET.Sdk">
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<ProductVersion>10.0.20506</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}</ProjectGuid>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>actorcompiler</RootNamespace>
<AssemblyName>actorcompiler</AssemblyName>
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<OutputPath>$(SolutionDir)bin\$(Configuration)\</OutputPath>
<PublishUrl>publish\</PublishUrl>
<Install>true</Install>
<InstallFrom>Disk</InstallFrom>
<UpdateEnabled>false</UpdateEnabled>
<UpdateMode>Foreground</UpdateMode>
<UpdateInterval>7</UpdateInterval>
<UpdateIntervalUnits>Days</UpdateIntervalUnits>
<UpdatePeriodically>false</UpdatePeriodically>
<UpdateRequired>false</UpdateRequired>
<MapFileExtensions>true</MapFileExtensions>
<ApplicationRevision>0</ApplicationRevision>
<ApplicationVersion>1.0.0.%2a</ApplicationVersion>
<IsWebBootstrapper>false</IsWebBootstrapper>
<UseApplicationTrust>false</UseApplicationTrust>
<BootstrapperEnabled>true</BootstrapperEnabled>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|AnyCPU'">
<DebugSymbols>true</DebugSymbols>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<DebugType>full</DebugType>
<PlatformTarget>AnyCPU</PlatformTarget>
<LangVersion>default</LangVersion>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisIgnoreBuiltInRuleSets>false</CodeAnalysisIgnoreBuiltInRuleSets>
<CodeAnalysisFailOnMissingRules>false</CodeAnalysisFailOnMissingRules>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|AnyCPU'">
<DefineConstants>TRACE</DefineConstants>
<Optimize>true</Optimize>
<DebugType>pdbonly</DebugType>
<PlatformTarget>AnyCPU</PlatformTarget>
<LangVersion>default</LangVersion>
<ErrorReport>prompt</ErrorReport>
<CodeAnalysisIgnoreBuiltInRuleSets>false</CodeAnalysisIgnoreBuiltInRuleSets>
<CodeAnalysisIgnoreBuiltInRules>false</CodeAnalysisIgnoreBuiltInRules>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
<Reference Include="System.Xml.Linq">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
<Reference Include="System.Data.DataSetExtensions">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
<Reference Include="Microsoft.CSharp">
<RequiredTargetFramework>4.0</RequiredTargetFramework>
</Reference>
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="ActorCompiler.cs" />
<Compile Include="ActorParser.cs" />
<Compile Include="ParseTree.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<BootstrapperPackage Include=".NETFramework,Version=v4.0">
<Visible>False</Visible>
<ProductName>Microsoft .NET Framework 4 %28x86 and x64%29</ProductName>
<Install>true</Install>
</BootstrapperPackage>
<BootstrapperPackage Include="Microsoft.Net.Client.3.5">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1 Client Profile</ProductName>
<Install>false</Install>
</BootstrapperPackage>
<BootstrapperPackage Include="Microsoft.Net.Framework.3.5.SP1">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1</ProductName>
<Install>false</Install>
</BootstrapperPackage>
<BootstrapperPackage Include="Microsoft.Windows.Installer.3.1">
<Visible>False</Visible>
<ProductName>Windows Installer 3.1</ProductName>
<Install>true</Install>
</BootstrapperPackage>
</ItemGroup>
<ItemGroup>
<Content Include="Actor checklist.txt" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

View File

@ -1,34 +0,0 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "actorcompiler", "actorcompiler.csproj", "{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Debug|x64.ActiveCfg = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Debug|x64.Build.0 = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Debug|x86.ActiveCfg = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Debug|x86.Build.0 = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Release|Any CPU.ActiveCfg = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Release|Any CPU.Build.0 = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Release|x64.ActiveCfg = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Release|x64.Build.0 = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Release|x86.ActiveCfg = Debug|Any CPU
{0ECC1314-3FC2-458D-8E41-B50B4EA24E51}.Release|x86.Build.0 = Debug|Any CPU
EndGlobalSection
EndGlobal

View File

@ -26,27 +26,6 @@
#include <stdarg.h>
#include <cinttypes>
thread_local Reference<ActorLineage> currentLineage;
WriteOnlyVariable<ActorLineage, unsigned> currentLineageThreadSafe;
LineagePropertiesBase::~LineagePropertiesBase() {}
ActorLineage::ActorLineage() : properties(), parent(currentLineage) {}
ActorLineage::~ActorLineage() {
for (auto ptr : properties) {
delete ptr.second;
}
}
using namespace std::literals;
const std::string_view StackLineage::name = "StackLineage"sv;
std::vector<StringRef> getActorStackTrace() {
return currentLineage->stack(&StackLineage::actorName);
}
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(__AVX__) && !defined(MEMORY_SANITIZER)
// For benchmarking; need a version of rte_memcpy that doesn't live in the same compilation unit as the test.
void* rte_memcpy_noinline(void* __restrict __dest, const void* __restrict __src, size_t __n) {

View File

@ -20,8 +20,6 @@
#ifndef FLOW_FLOW_H
#define FLOW_FLOW_H
#include "flow/Arena.h"
#include "flow/FastRef.h"
#pragma once
#pragma warning(disable : 4244 4267) // SOMEDAY: Carefully check for integer overflow issues (e.g. size_t to int
@ -31,18 +29,14 @@
#include <vector>
#include <queue>
#include <stack>
#include <map>
#include <unordered_map>
#include <set>
#include <functional>
#include <iostream>
#include <string>
#include <string_view>
#include <utility>
#include <algorithm>
#include <memory>
#include <mutex>
#include "flow/Platform.h"
#include "flow/FastAlloc.h"
@ -52,7 +46,6 @@
#include "flow/ThreadPrimitives.h"
#include "flow/network.h"
#include "flow/FileIdentifier.h"
#include "flow/WriteOnlySet.h"
#include <boost/version.hpp>
@ -427,127 +420,6 @@ struct SingleCallback {
}
};
struct LineagePropertiesBase {
virtual ~LineagePropertiesBase();
};
// helper class to make implementation of LineageProperties easier
template <class Derived>
struct LineageProperties : LineagePropertiesBase {
// Contract:
//
// StringRef name = "SomeUniqueName"_str;
// this has to be implemented by subclasses
// but can't be made virtual.
// A user should implement this for any type
// within the properies class.
template <class Value>
bool isSet(Value Derived::*member) const {
return true;
}
};
struct ActorLineage : ThreadSafeReferenceCounted<ActorLineage> {
friend class LocalLineage;
private:
std::unordered_map<std::string_view, LineagePropertiesBase*> properties;
Reference<ActorLineage> parent;
mutable std::mutex mutex;
using Lock = std::unique_lock<std::mutex>;
public:
ActorLineage();
~ActorLineage();
bool isRoot() const {
Lock _{ mutex };
return parent.getPtr() == nullptr;
}
void makeRoot() {
Lock _{ mutex };
parent.clear();
}
template <class T, class V>
V& modify(V T::*member) {
Lock _{ mutex };
auto& res = properties[T::name];
if (!res) {
res = new T{};
}
T* map = static_cast<T*>(res);
return map->*member;
}
template <class T, class V>
std::optional<V> get(V T::*member) const {
Lock _{ mutex };
auto current = this;
while (current != nullptr) {
auto iter = current->properties.find(T::name);
if (iter != current->properties.end()) {
T const& map = static_cast<T const&>(*iter->second);
if (map.isSet(member)) {
return map.*member;
}
}
current = current->parent.getPtr();
}
return std::optional<V>{};
}
template <class T, class V>
std::vector<V> stack(V T::*member) const {
Lock _{ mutex };
auto current = this;
std::vector<V> res;
while (current != nullptr) {
auto iter = current->properties.find(T::name);
if (iter != current->properties.end()) {
T const& map = static_cast<T const&>(*iter->second);
if (map.isSet(member)) {
res.push_back(map.*member);
}
}
current = current->parent.getPtr();
}
return res;
}
};
extern thread_local Reference<ActorLineage> currentLineage;
extern WriteOnlyVariable<ActorLineage, unsigned> currentLineageThreadSafe;
// This class can be used in order to modify all lineage properties
// of actors created within a (non-actor) scope
struct LocalLineage {
Reference<ActorLineage> lineage = Reference<ActorLineage>{ new ActorLineage() };
Reference<ActorLineage> oldLineage;
LocalLineage() {
oldLineage = currentLineage;
currentLineage = lineage;
currentLineageThreadSafe.replace(lineage);
}
~LocalLineage() {
currentLineage = oldLineage;
currentLineageThreadSafe.replace(oldLineage);
}
};
struct restore_lineage {
Reference<ActorLineage> prev;
restore_lineage() : prev(currentLineage) {}
~restore_lineage() {
currentLineage = prev;
currentLineageThreadSafe.replace(prev);
}
};
struct StackLineage : LineageProperties<StackLineage> {
static const std::string_view name;
StringRef actorName;
};
extern std::vector<StringRef> getActorStackTrace();
// SAV is short for Single Assignment Variable: It can be assigned for only once!
template <class T>
struct SAV : private Callback<T>, FastAllocated<SAV<T>> {
@ -589,9 +461,8 @@ public:
ASSERT(canBeSet());
new (&value_storage) T(std::forward<U>(value));
this->error_state = Error::fromCode(SET_ERROR_CODE);
while (Callback<T>::next != this) {
while (Callback<T>::next != this)
Callback<T>::next->fire(this->value());
}
}
void send(Never) {
@ -602,9 +473,8 @@ public:
void sendError(Error err) {
ASSERT(canBeSet() && int16_t(err.code()) > 0);
this->error_state = err;
while (Callback<T>::next != this) {
while (Callback<T>::next != this)
Callback<T>::next->error(err);
}
}
template <class U>
@ -753,9 +623,8 @@ struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>
return;
this->error = err;
if (SingleCallback<T>::next != this) {
if (SingleCallback<T>::next != this)
SingleCallback<T>::next->error(err);
}
}
void addPromiseRef() { promises++; }
@ -1123,73 +992,36 @@ static inline void destruct(T& t) {
template <class ReturnValue>
struct Actor : SAV<ReturnValue> {
Reference<ActorLineage> lineage = Reference<ActorLineage>{ new ActorLineage() };
int8_t actor_wait_state; // -1 means actor is cancelled; 0 means actor is not waiting; 1-N mean waiting in callback
// group #
Actor() : SAV<ReturnValue>(1, 1), actor_wait_state(0) {
/*++actorCount;*/
currentLineage = lineage;
currentLineageThreadSafe.replace(lineage);
Actor() : SAV<ReturnValue>(1, 1), actor_wait_state(0) { /*++actorCount;*/
}
//~Actor() { --actorCount; }
Reference<ActorLineage> setLineage() {
auto res = currentLineage;
currentLineage = lineage;
currentLineageThreadSafe.replace(lineage);
return res;
}
};
template <>
struct Actor<void> {
// This specialization is for a void actor (one not returning a future, hence also uncancellable)
Reference<ActorLineage> lineage = Reference<ActorLineage>{ new ActorLineage() };
int8_t actor_wait_state; // 0 means actor is not waiting; 1-N mean waiting in callback group #
Actor() : actor_wait_state(0) {
/*++actorCount;*/
currentLineage = lineage;
currentLineageThreadSafe.replace(lineage);
Actor() : actor_wait_state(0) { /*++actorCount;*/
}
//~Actor() { --actorCount; }
Reference<ActorLineage> setLineage() {
auto res = currentLineage;
currentLineage = lineage;
currentLineageThreadSafe.replace(lineage);
return res;
}
};
template <class ActorType, int CallbackNumber, class ValueType>
struct ActorCallback : Callback<ValueType> {
virtual void fire(ValueType const& value) override {
auto _ = static_cast<ActorType*>(this)->setLineage();
static_cast<ActorType*>(this)->a_callback_fire(this, value);
}
virtual void error(Error e) override {
auto _ = static_cast<ActorType*>(this)->setLineage();
static_cast<ActorType*>(this)->a_callback_error(this, e);
}
void fire(ValueType const& value) override { static_cast<ActorType*>(this)->a_callback_fire(this, value); }
void error(Error e) override { static_cast<ActorType*>(this)->a_callback_error(this, e); }
};
template <class ActorType, int CallbackNumber, class ValueType>
struct ActorSingleCallback : SingleCallback<ValueType> {
void fire(ValueType const& value) override {
auto _ = static_cast<ActorType*>(this)->setLineage();
static_cast<ActorType*>(this)->a_callback_fire(this, value);
}
void fire(ValueType&& value) override {
auto _ = static_cast<ActorType*>(this)->setLineage();
static_cast<ActorType*>(this)->a_callback_fire(this, std::move(value));
}
void error(Error e) override {
auto _ = static_cast<ActorType*>(this)->setLineage();
static_cast<ActorType*>(this)->a_callback_error(this, e);
}
void fire(ValueType const& value) override { static_cast<ActorType*>(this)->a_callback_fire(this, value); }
void fire(ValueType&& value) override { static_cast<ActorType*>(this)->a_callback_fire(this, std::move(value)); }
void error(Error e) override { static_cast<ActorType*>(this)->a_callback_error(this, e); }
};
inline double now() {
return g_network->now();

View File

@ -1547,10 +1547,6 @@ struct YieldedFutureActor : SAV<Void>, ActorCallback<YieldedFutureActor, 1, Void
void destroy() override { delete this; }
Reference<ActorLineage> setLineage() {
return currentLineage;
}
void a_callback_fire(ActorCallback<YieldedFutureActor, 1, Void>*, Void) {
if (int16_t(in_error_state.code()) == UNSET_ERROR_CODE) {
in_error_state = Error::fromCode(SET_ERROR_CODE);

View File

@ -35,7 +35,6 @@
#include "flow/Arena.h"
#include "flow/IRandom.h"
#include "flow/Trace.h"
#include "flow/WriteOnlySet.h"
enum class TaskPriority {
Max = 1000000,
@ -560,9 +559,6 @@ public:
// returns false.
virtual bool checkRunnable() = 0;
// Returns the shared memory data structure used to store actor lineages.
virtual ActorLineageSet& getActorLineageSet() = 0;
virtual ProtocolVersion protocolVersion() = 0;
// Shorthand for transport().getLocalAddress()

View File

@ -1,237 +0,0 @@
/*
* (C) Copyright 2015 ETH Zurich Systems Group (http://www.systems.ethz.ch/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Markus Pilman <mpilman@inf.ethz.ch>
* Simon Loesing <sloesing@inf.ethz.ch>
* Thomas Etter <etterth@gmail.com>
* Kevin Bocksrocker <kevin.bocksrocker@gmail.com>
* Lucas Braun <braunl@inf.ethz.ch>
*/
#pragma once
#include <mutex>
#include <memory>
#include <cstdlib>
#include <cassert>
namespace crossbow {
/**
* @brief A mock mutex for disabling locking in the singleton
*
* This class implements the mutex concept with empty methods.
* This can be used to disable synchronization in the singleton
* holder.
*/
struct no_locking {
void lock() {}
void unlock() {}
bool try_lock() { return true; }
};
template <typename T>
struct create_static {
static constexpr bool supports_recreation = false;
union max_align {
char t_[sizeof(T)];
short int short_int_;
long int long_int_;
float float_;
double double_;
long double longDouble_;
struct Test;
int Test::*pMember_;
int (Test::*pMemberFn_)(int);
};
static T* create() {
static max_align static_memory_;
return new (&static_memory_) T;
}
static void destroy(T* ptr) { ptr->~T(); }
};
template <typename T>
struct create_using_new {
static constexpr bool supports_recreation = true;
static T* create() { return new T; };
static void destroy(T* ptr) { delete ptr; }
};
template <typename T>
struct create_using_malloc {
static constexpr bool supports_recreation = true;
static T* create() {
void* p = std::malloc(sizeof(T));
if (!p)
return nullptr;
return new (p) T;
}
static void destroy(T* ptr) {
ptr->~T();
free(ptr);
}
};
template <class T, class allocator>
struct create_using {
static constexpr bool supports_recreation = true;
static allocator alloc_;
static T* create() {
T* p = alloc_.allocate(1);
if (!p)
return nullptr;
alloc_.construct(p);
return p;
};
static void destroy(T* ptr) {
alloc_.destroy(ptr);
alloc_.deallocate(ptr, 1);
}
};
template <typename T>
struct default_lifetime {
static void schedule_destruction(T*, void (*func)()) { std::atexit(func); }
static void on_dead_ref() { throw std::logic_error("Dead reference detected"); }
};
template <typename T>
struct phoenix_lifetime {
static void schedule_destruction(T*, void (*func)()) { std::atexit(func); }
static void on_dead_ref() {}
};
template <typename T>
struct infinite_lifetime {
static void schedule_destruction(T*, void (*)()) {}
static void on_dead_ref() {}
};
template <typename T>
struct lifetime_traits {
static constexpr bool supports_recreation = true;
};
template <typename T>
struct lifetime_traits<infinite_lifetime<T>> {
static constexpr bool supports_recreation = false;
};
template <typename T>
struct lifetime_traits<default_lifetime<T>> {
static constexpr bool supports_recreation = false;
};
template <typename Type,
typename Create = create_static<Type>,
typename LifetimePolicy = default_lifetime<Type>,
typename Mutex = std::mutex>
class singleton {
public:
typedef Type value_type;
typedef Type* pointer;
typedef const Type* const_pointer;
typedef const Type& const_reference;
typedef Type& reference;
private:
static bool destroyed_;
static pointer instance_;
static Mutex mutex_;
static void destroy() {
if (destroyed_)
return;
Create::destroy(instance_);
instance_ = nullptr;
destroyed_ = true;
}
public:
static reference instance() {
static_assert(Create::supports_recreation || !lifetime_traits<LifetimePolicy>::supports_recreation,
"The creation policy does not support instance recreation, while the lifetime does support it.");
if (!instance_) {
std::lock_guard<Mutex> l(mutex_);
if (!instance_) {
if (destroyed_) {
destroyed_ = false;
LifetimePolicy::on_dead_ref();
}
instance_ = Create::create();
LifetimePolicy::schedule_destruction(instance_, &destroy);
}
}
return *instance_;
}
/**
* WARNING: DO NOT EXECUTE THIS MULTITHREADED!!!
*/
static void destroy_instance() {
if (instance_) {
std::lock_guard<Mutex> l(mutex_);
destroy();
}
}
public:
pointer operator->() {
if (!instance_) {
instance();
}
return instance_;
}
reference operator*() {
if (!instance_) {
instance();
}
return *instance_;
}
const_pointer operator->() const {
if (!instance_) {
instance();
}
return instance_;
}
const_reference operator*() const {
if (!instance_) {
instance();
}
return *instance_;
}
};
template <typename T, typename C, typename L, typename M>
bool singleton<T, C, L, M>::destroyed_ = false;
template <typename T, typename C, typename L, typename M>
typename singleton<T, C, L, M>::pointer singleton<T, C, L, M>::instance_ = nullptr;
template <typename T, typename C, typename L, typename M>
M singleton<T, C, L, M>::mutex_;
} // namespace crossbow

View File

@ -38,7 +38,7 @@ cluster_file = {etcdir}/fdb.cluster
command = {fdbserver_bin}
public_address = auto:$ID
listen_address = public
datadir = {datadir}/$ID
datadir = {datadir}
logdir = {logdir}
# logsize = 10MiB
# maxlogssize = 100MiB