collect and serialize

This commit is contained in:
Markus Pilman 2021-04-09 14:25:11 -06:00
parent ab3efd7d9d
commit 2064903705
13 changed files with 606 additions and 30 deletions

View File

@ -152,6 +152,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
endif() endif()
include(CompileBoost) include(CompileBoost)
include(GetMsgpack)
add_subdirectory(flow) add_subdirectory(flow)
add_subdirectory(fdbrpc) add_subdirectory(fdbrpc)
add_subdirectory(fdbclient) add_subdirectory(fdbclient)

16
cmake/GetMsgpack.cmake Normal file
View File

@ -0,0 +1,16 @@
find_package(msgpack 3.3.0 EXACT QUIET CONFIG)
add_library(msgpack INTERFACE)
if(msgpack_FOUND)
target_link_libraries(msgpack INTERFACE msgpackc-cxx)
else()
include(ExternalProject)
ExternalProject_add(msgpackProject
URL "https://github.com/msgpack/msgpack-c/releases/download/cpp-3.3.0/msgpack-3.3.0.tar.gz"
URL_HASH SHA256=6e114d12a5ddb8cb11f669f83f32246e484a8addd0ce93f274996f1941c1f07b
CONFIGURE_COMMAND BUILD_COMMAND INSTALL_COMMAND)
ExternalProject_Get_property(msgpackProject SOURCE_DIR)
target_include_directories(msgpack SYSTEM INTERFACE "${SOURCE_DIR}/include")
endif()

View File

@ -0,0 +1,183 @@
/*
* ActorLineageProfiler.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-20201 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/singleton.h"
#include "fdbclient/ActorLineageProfiler.h"
#include <msgpack.hpp>
#include <memory>
#include <boost/endian/conversion.hpp>
using namespace std::literals;
class Packer : public msgpack::packer<msgpack::sbuffer> {
struct visitor_t {
using VisitorMap = std::unordered_map<std::type_info, std::function<void(std::any const&, Packer& packer)>>;
VisitorMap visitorMap;
template <class T>
static void any_visitor(std::any const& val, Packer& packer) {
const T& v = std::any_cast<const T&>(val);
packer.pack(v);
}
template <class... Args>
struct populate_visitor_map;
template <class Head, class... Tail>
struct populate_visitor_map<Head, Tail...> {
static void populate(VisitorMap& map) {
map.emplace(any_visitor<Head>);
populate_visitor_map<Tail...>::populate(map);
}
};
template <>
struct populate_visitor_map<> {
static void populate(VisitorMap&) {}
};
visitor_t() { populate_visitor_map<int64_t, uint64_t, bool, float, double>::populate(visitorMap); }
void visit(const std::any& val, Packer& packer) {
auto iter = visitorMap.find(val.type());
if (iter == visitorMap.end()) {
// TODO: trace error
} else {
iter->second(val, packer);
}
}
};
msgpack::sbuffer sbuffer;
// Initializing visitor_t involves building a type-map. As this is a relatively expensive operation, we don't want
// to do this each time we create a Packer object. So visitor_t is a stateless class and we only use it as a
// visitor.
crossbow::singleton<visitor_t> visitor;
public:
Packer() : msgpack::packer<msgpack::sbuffer>(sbuffer) {}
void pack(std::any const& val) { visitor->visit(val, *this); }
void pack(bool val) {
if (val) {
pack_true();
} else {
pack_false();
}
}
void pack(uint64_t val) {
if (val <= std::numeric_limits<uint8_t>::max()) {
pack_uint8(uint8_t(val));
} else if (val <= std::numeric_limits<uint16_t>::max()) {
pack_uint16(uint16_t(val));
} else if (val <= std::numeric_limits<uint32_t>::max()) {
pack_uint32(uint32_t(val));
} else {
pack_uint64(val);
}
}
void pack(int64_t val) {
if (val >= 0) {
this->pack(uint64_t(val));
} else if (val >= std::numeric_limits<uint8_t>::min()) {
pack_int8(int8_t(val));
} else if (val >= std::numeric_limits<uint16_t>::min()) {
pack_int8(int16_t(val));
} else if (val >= std::numeric_limits<uint32_t>::min()) {
pack_int8(int32_t(val));
} else if (val >= std::numeric_limits<uint64_t>::min()) {
pack_int8(int64_t(val));
}
}
void pack(float val) { pack_float(val); }
void pack(double val) { pack_double(val); }
void pack(std::string const& str) {
pack_str(str.size());
pack_str_body(str.data(), str.size());
}
void pack(std::string_view val) {
pack_str(val.size());
pack_str_body(val.data(), val.size());
}
template <class K, class V>
void pack(std::map<K, V> const& map) {
pack_map(map.size());
for (const auto& p : map) {
pack(p.first);
pack(p.second);
}
}
template <class T>
void pack(std::vector<T> const& val) {
pack_array(val.size());
for (const auto& v : val) {
pack(v);
}
}
std::shared_ptr<Sample> done(double time) {
auto res = std::make_shared<Sample>();
res->time = time;
res->size = sbuffer.size();
res->data = sbuffer.release();
return res;
}
};
IALPCollectorBase::IALPCollectorBase() {
SampleCollector::instance().addCollector(this);
}
std::map<std::string_view, std::any> SampleCollectorT::collect(ActorLineage* lineage) {
std::map<std::string_view, std::any> out;
for (auto& collector : collectors) {
auto val = collector->collect(lineage);
if (val.has_value()) {
out[collector->name()] = val.value();
}
}
return out;
}
std::shared_ptr<Sample> SampleCollectorT::collect() {
Packer packer;
std::map<std::string_view, std::any> res;
double time = g_network->now();
res["time"sv] = time;
for (auto& p : getSamples) {
std::vector<std::map<std::string_view, std::any>> samples;
auto sampleVec = p.second();
for (auto& val : sampleVec) {
auto m = collect(val.getPtr());
if (!m.empty()) {
samples.emplace_back(std::move(m));
}
}
if (!samples.empty()) {
res[to_string(p.first)] = samples;
}
}
packer.pack(res);
return packer.done(time);
}

View File

@ -0,0 +1,80 @@
/*
* ActorLineageProfiler.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-20201 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <optional>
#include <string>
#include <any>
#include <vector>
#include "flow/singleton.h"
#include "flow/flow.h"
struct IALPCollectorBase {
virtual std::optional<std::any> collect(ActorLineage*) = 0;
virtual const std::string_view& name() = 0;
IALPCollectorBase();
};
template <class T>
struct IALPCollector : IALPCollectorBase {
const std::string_view& name() override {
static std::string_view res;
if (res == "") {
res = T::name;
}
return res;
}
};
enum class WaitState { Running, DiskIO };
std::string_view to_string(WaitState w) {
switch (w) {
case WaitState::Running:
return "Running";
case WaitState::DiskIO:
return "DiskIO";
}
}
struct Sample : std::enable_shared_from_this<Sample> {
double time = 0.0;
unsigned size = 0u;
char* data = nullptr;
~Sample() { ::free(data); }
};
class SampleCollectorT {
public: // Types
friend class crossbow::singleton<SampleCollectorT>;
using Getter = std::function<std::vector<Reference<ActorLineage>>()>;
private:
std::vector<IALPCollectorBase*> collectors;
std::map<WaitState, Getter> getSamples;
SampleCollectorT() {}
public:
void addCollector(IALPCollectorBase* collector) { collectors.push_back(collector); }
std::map<std::string_view, std::any> collect(ActorLineage* lineage);
std::shared_ptr<Sample> collect();
};
using SampleCollector = crossbow::singleton<SampleCollectorT>;

View File

@ -1,4 +1,6 @@
set(FDBCLIENT_SRCS set(FDBCLIENT_SRCS
ActorLineageProfiler.h
ActorLineageProfiler.cpp
AsyncFileS3BlobStore.actor.cpp AsyncFileS3BlobStore.actor.cpp
AsyncFileS3BlobStore.actor.h AsyncFileS3BlobStore.actor.h
AsyncTaskThread.actor.cpp AsyncTaskThread.actor.cpp
@ -137,8 +139,7 @@ endif()
add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs}) add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
add_dependencies(fdbclient fdboptions) add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc msgpack)
if(BUILD_AZURE_BACKUP) if(BUILD_AZURE_BACKUP)
target_link_libraries(fdbclient PUBLIC fdbrpc PRIVATE curl uuid azure-storage-lite) target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite)
else()
target_link_libraries(fdbclient PUBLIC fdbrpc)
endif() endif()

View File

@ -20,4 +20,6 @@
#include "fdbserver/RoleLineage.actor.h" #include "fdbserver/RoleLineage.actor.h"
StringRef RoleLineage::name = "RoleLineage"_sr; using namespace std::literals;
std::string_view RoleLineage::name = "RoleLineage"sv;

View File

@ -21,30 +21,47 @@
#pragma once #pragma once
#include "flow/flow.h" #include "flow/flow.h"
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_ROLE_LINEAGE_ACTOR_G_H) #if defined(NO_INTELLISENSE) && !defined(FDBSERVER_ROLE_LINEAGE_ACTOR_G_H)
# define FDBSERVER_ROLE_LINEAGE_ACTOR_G_H #define FDBSERVER_ROLE_LINEAGE_ACTOR_G_H
# include "fdbserver/RoleLineage.actor.g.h" #include "fdbserver/RoleLineage.actor.g.h"
#elif !defined(FDBSERVER_ROLE_LINEAGE_ACTOR_H) #elif !defined(FDBSERVER_ROLE_LINEAGE_ACTOR_H)
# define FDBSERVER_ROLE_LINEAGE_ACTOR_H #define FDBSERVER_ROLE_LINEAGE_ACTOR_H
#include "flow/singleton.h"
#include "fdbrpc/Locality.h" #include "fdbrpc/Locality.h"
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbserver/WorkerInterface.actor.h"
#include <string_view>
#include <msgpack.hpp>
#include <any>
#include "flow/actorcompiler.h" // This must be the last include #include "flow/actorcompiler.h" // This must be the last include
struct RoleLineage : LineageProperties<RoleLineage> { struct RoleLineage : LineageProperties<RoleLineage> {
static StringRef name; static std::string_view name;
ProcessClass::ClusterRole role = ProcessClass::NoRole; ProcessClass::ClusterRole role = ProcessClass::NoRole;
bool isSet(ProcessClass::ClusterRole RoleLineage::*member) const { bool isSet(ProcessClass::ClusterRole RoleLineage::*member) const { return this->*member != ProcessClass::NoRole; }
return this->*member != ProcessClass::NoRole; };
}
struct RoleLineageCollector : IALPCollector<RoleLineage> {
RoleLineageCollector() : IALPCollector() {}
std::optional<std::any> collect(ActorLineage* lineage) override {
auto res = lineage->get(&RoleLineage::role);
if (res.has_value()) {
return Role::get(res.value()).abbreviation;
} else {
return std::optional<std::any>();
}
}
}; };
// creates a new root and sets the role lineage // creates a new root and sets the role lineage
ACTOR template<class Fun> ACTOR template <class Fun>
Future<decltype(std::declval<Fun>()())> runInRole(Fun fun, ProcessClass::ClusterRole role) { Future<decltype(std::declval<Fun>()())> runInRole(Fun fun, ProcessClass::ClusterRole role) {
currentLineage->makeRoot(); currentLineage->makeRoot();
currentLineage->modify(&RoleLineage::role) = role; currentLineage->modify(&RoleLineage::role) = role;
decltype(std::declval<Fun>()()) res = wait(fun()); decltype(std::declval<Fun>()()) res = wait(fun());
return res; return res;
} }
#endif #endif

View File

@ -787,6 +787,40 @@ struct Role {
std::string abbreviation; std::string abbreviation;
bool includeInTraceRoles; bool includeInTraceRoles;
static const Role& get(ProcessClass::ClusterRole role) {
switch (role) {
case ProcessClass::Storage:
return STORAGE_SERVER;
case ProcessClass::TLog:
return TRANSACTION_LOG;
case ProcessClass::CommitProxy:
return COMMIT_PROXY;
case ProcessClass::GrvProxy:
return GRV_PROXY;
case ProcessClass::Master:
return MASTER;
case ProcessClass::Resolver:
return RESOLVER;
case ProcessClass::LogRouter:
return LOG_ROUTER;
case ProcessClass::ClusterController:
return CLUSTER_CONTROLLER;
case ProcessClass::DataDistributor:
return DATA_DISTRIBUTOR;
case ProcessClass::Ratekeeper:
return RATEKEEPER;
case ProcessClass::StorageCache:
return STORAGE_CACHE;
case ProcessClass::Backup:
return BACKUP;
case ProcessClass::Worker:
return WORKER;
case ProcessClass::NoRole:
ASSERT(false);
throw internal_error();
}
}
bool operator==(const Role& r) const { return roleName == r.roleName; } bool operator==(const Role& r) const { return roleName == r.roleName; }
bool operator!=(const Role& r) const { return !(*this == r); } bool operator!=(const Role& r) const { return !(*this == r); }

View File

@ -226,7 +226,9 @@ public:
TaskPriority currentTaskID; TaskPriority currentTaskID;
uint64_t tasksIssued; uint64_t tasksIssued;
TDMetricCollection tdmetrics; TDMetricCollection tdmetrics;
double currentTime; // we read now() from a different thread. On Intel, reading a double is atomic anyways, but on other platforms it's
// not. For portability this should be atomic
std::atomic<double> currentTime;
// May be accessed off the network thread, e.g. by onMainThread // May be accessed off the network thread, e.g. by onMainThread
std::atomic<bool> stopped; std::atomic<bool> stopped;
mutable std::map<IPAddress, bool> addressOnHostCache; mutable std::map<IPAddress, bool> addressOnHostCache;

View File

@ -3685,8 +3685,8 @@ void* sampleThread(void* arg) {
printf("Currently running actor lineage (%p):\n", actorLineage.getPtr()); printf("Currently running actor lineage (%p):\n", actorLineage.getPtr());
auto stack = actorLineage->stack(&StackLineage::actorName); auto stack = actorLineage->stack(&StackLineage::actorName);
while (!stack.empty()) { while (!stack.empty()) {
printf("%s ", stack.top()); printf("%s ", stack.back());
stack.pop(); stack.pop_back();
} }
printf("\n"); printf("\n");
@ -3697,8 +3697,8 @@ void* sampleThread(void* arg) {
for (auto actorLineage : diskAlps) { for (auto actorLineage : diskAlps) {
auto stack = actorLineage->stack(&StackLineage::actorName); auto stack = actorLineage->stack(&StackLineage::actorName);
while (!stack.empty()) { while (!stack.empty()) {
printf("%s ", stack.top()); printf("%s ", stack.back());
stack.pop(); stack.pop_back();
} }
printf("\n"); printf("\n");
} }

View File

@ -39,9 +39,11 @@ ActorLineage::~ActorLineage() {
} }
} }
StringRef StackLineage::name = "StackLineage"_sr; using namespace std::literals;
std::stack<StringRef> getActorStackTrace() { std::string_view StackLineage::name = "StackLineage"sv;
std::vector<StringRef> getActorStackTrace() {
return currentLineage->stack(&StackLineage::actorName); return currentLineage->stack(&StackLineage::actorName);
} }

View File

@ -38,6 +38,7 @@
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <string_view>
#include <utility> #include <utility>
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
@ -450,7 +451,7 @@ struct ActorLineage : ReferenceCounted<ActorLineage> {
friend class LocalLineage; friend class LocalLineage;
private: private:
std::unordered_map<StringRef, LineagePropertiesBase*> properties; std::unordered_map<std::string_view, LineagePropertiesBase*> properties;
Reference<ActorLineage> parent; Reference<ActorLineage> parent;
public: public:
@ -483,15 +484,15 @@ public:
return std::optional<V>{}; return std::optional<V>{};
} }
template <class T, class V> template <class T, class V>
std::stack<V> stack(V T::*member) const { std::vector<V> stack(V T::*member) const {
auto current = this; auto current = this;
std::stack<V> res; std::vector<V> res;
while (current != nullptr) { while (current != nullptr) {
auto iter = current->properties.find(T::name); auto iter = current->properties.find(T::name);
if (iter != current->properties.end()) { if (iter != current->properties.end()) {
T const& map = static_cast<T const&>(*iter->second); T const& map = static_cast<T const&>(*iter->second);
if (map.isSet(member)) { if (map.isSet(member)) {
res.push(map.*member); res.push_back(map.*member);
} }
} }
current = current->parent.getPtr(); current = current->parent.getPtr();
@ -529,11 +530,11 @@ struct restore_lineage {
}; };
struct StackLineage : LineageProperties<StackLineage> { struct StackLineage : LineageProperties<StackLineage> {
static StringRef name; static const std::string_view name;
StringRef actorName; StringRef actorName;
}; };
extern std::stack<StringRef> getActorStackTrace(); extern std::vector<StringRef> getActorStackTrace();
// SAV is short for Single Assignment Variable: It can be assigned for only once! // SAV is short for Single Assignment Variable: It can be assigned for only once!
template <class T> template <class T>

237
flow/singleton.h Normal file
View File

@ -0,0 +1,237 @@
/*
* (C) Copyright 2015 ETH Zurich Systems Group (http://www.systems.ethz.ch/) and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Markus Pilman <mpilman@inf.ethz.ch>
* Simon Loesing <sloesing@inf.ethz.ch>
* Thomas Etter <etterth@gmail.com>
* Kevin Bocksrocker <kevin.bocksrocker@gmail.com>
* Lucas Braun <braunl@inf.ethz.ch>
*/
#pragma once
#include <mutex>
#include <memory>
#include <cstdlib>
#include <cassert>
namespace crossbow {
/**
* @brief A mock mutex for disabling locking in the singleton
*
* This class implements the mutex concept with empty methods.
* This can be used to disable synchronization in the singleton
* holder.
*/
struct no_locking {
void lock() {}
void unlock() {}
bool try_lock() { return true; }
};
template <typename T>
struct create_static {
static constexpr bool supports_recreation = false;
union max_align {
char t_[sizeof(T)];
short int short_int_;
long int long_int_;
float float_;
double double_;
long double longDouble_;
struct Test;
int Test::*pMember_;
int (Test::*pMemberFn_)(int);
};
static T* create() {
static max_align static_memory_;
return new (&static_memory_) T;
}
static void destroy(T* ptr) { ptr->~T(); }
};
template <typename T>
struct create_using_new {
static constexpr bool supports_recreation = true;
static T* create() { return new T; };
static void destroy(T* ptr) { delete ptr; }
};
template <typename T>
struct create_using_malloc {
static constexpr bool supports_recreation = true;
static T* create() {
void* p = std::malloc(sizeof(T));
if (!p)
return nullptr;
return new (p) T;
}
static void destroy(T* ptr) {
ptr->~T();
free(ptr);
}
};
template <class T, class allocator>
struct create_using {
static constexpr bool supports_recreation = true;
static allocator alloc_;
static T* create() {
T* p = alloc_.allocate(1);
if (!p)
return nullptr;
alloc_.construct(p);
return p;
};
static void destroy(T* ptr) {
alloc_.destroy(ptr);
alloc_.deallocate(ptr, 1);
}
};
template <typename T>
struct default_lifetime {
static void schedule_destruction(T*, void (*func)()) { std::atexit(func); }
static void on_dead_ref() { throw std::logic_error("Dead reference detected"); }
};
template <typename T>
struct phoenix_lifetime {
static void schedule_destruction(T*, void (*func)()) { std::atexit(func); }
static void on_dead_ref() {}
};
template <typename T>
struct infinite_lifetime {
static void schedule_destruction(T*, void (*)()) {}
static void on_dead_ref() {}
};
template <typename T>
struct lifetime_traits {
static constexpr bool supports_recreation = true;
};
template <typename T>
struct lifetime_traits<infinite_lifetime<T>> {
static constexpr bool supports_recreation = false;
};
template <typename T>
struct lifetime_traits<default_lifetime<T>> {
static constexpr bool supports_recreation = false;
};
template <typename Type,
typename Create = create_static<Type>,
typename LifetimePolicy = default_lifetime<Type>,
typename Mutex = std::mutex>
class singleton {
public:
typedef Type value_type;
typedef Type* pointer;
typedef const Type* const_pointer;
typedef const Type& const_reference;
typedef Type& reference;
private:
static bool destroyed_;
static pointer instance_;
static Mutex mutex_;
static void destroy() {
if (destroyed_)
return;
Create::destroy(instance_);
instance_ = nullptr;
destroyed_ = true;
}
public:
static reference instance() {
static_assert(Create::supports_recreation || !lifetime_traits<LifetimePolicy>::supports_recreation,
"The creation policy does not support instance recreation, while the lifetime does support it.");
if (!instance_) {
std::lock_guard<Mutex> l(mutex_);
if (!instance_) {
if (destroyed_) {
destroyed_ = false;
LifetimePolicy::on_dead_ref();
}
instance_ = Create::create();
LifetimePolicy::schedule_destruction(instance_, &destroy);
}
}
return *instance_;
}
/**
* WARNING: DO NOT EXECUTE THIS MULTITHREADED!!!
*/
static void destroy_instance() {
if (instance_) {
std::lock_guard<Mutex> l(mutex_);
destroy();
}
}
public:
pointer operator->() {
if (!instance_) {
instance();
}
return instance_;
}
reference operator*() {
if (!instance_) {
instance();
}
return *instance_;
}
const_pointer operator->() const {
if (!instance_) {
instance();
}
return instance_;
}
const_reference operator*() const {
if (!instance_) {
instance();
}
return *instance_;
}
};
template <typename T, typename C, typename L, typename M>
bool singleton<T, C, L, M>::destroyed_ = false;
template <typename T, typename C, typename L, typename M>
typename singleton<T, C, L, M>::pointer singleton<T, C, L, M>::instance_ = nullptr;
template <typename T, typename C, typename L, typename M>
M singleton<T, C, L, M>::mutex_;
} // namespace crossbow