This commit is contained in:
Lukas Joswiak 2021-06-11 13:01:32 -07:00
parent 3f4543f24a
commit 0301072690
10 changed files with 182 additions and 45 deletions

View File

@ -396,15 +396,15 @@ ACTOR Future<Void> fdbStatusStresser() {
std::unordered_map<std::string, std::function<Future<Void>()>> actors = { std::unordered_map<std::string, std::function<Future<Void>()>> actors = {
{ "timer", &simpleTimer }, // ./tutorial timer { "timer", &simpleTimer }, // ./tutorial timer
{ "promiseDemo", &promiseDemo }, // ./tutorial promiseDemo // { "promiseDemo", &promiseDemo }, // ./tutorial promiseDemo
{ "triggerDemo", &triggerDemo }, // ./tutorial triggerDemo // { "triggerDemo", &triggerDemo }, // ./tutorial triggerDemo
{ "echoServer", &echoServer }, // ./tutorial -p 6666 echoServer // { "echoServer", &echoServer }, // ./tutorial -p 6666 echoServer
{ "echoClient", &echoClient }, // ./tutorial -s 127.0.0.1:6666 echoClient // { "echoClient", &echoClient }, // ./tutorial -s 127.0.0.1:6666 echoClient
{ "kvStoreServer", &kvStoreServer }, // ./tutorial -p 6666 kvStoreServer // { "kvStoreServer", &kvStoreServer }, // ./tutorial -p 6666 kvStoreServer
{ "kvSimpleClient", &kvSimpleClient }, // ./tutorial -s 127.0.0.1:6666 kvSimpleClient // { "kvSimpleClient", &kvSimpleClient }, // ./tutorial -s 127.0.0.1:6666 kvSimpleClient
{ "multipleClients", &multipleClients }, // ./tutorial -s 127.0.0.1:6666 multipleClients // { "multipleClients", &multipleClients }, // ./tutorial -s 127.0.0.1:6666 multipleClients
{ "fdbClient", &fdbClient }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClient // { "fdbClient", &fdbClient }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClient
{ "fdbStatusStresser", &fdbStatusStresser } // { "fdbStatusStresser", &fdbStatusStresser }
}; // ./tutorial -C $CLUSTER_FILE_PATH fdbStatusStresser }; // ./tutorial -C $CLUSTER_FILE_PATH fdbStatusStresser
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {

View File

@ -29,6 +29,11 @@
using namespace std::literals; using namespace std::literals;
// TODO: For debugging, remove
LineageReference<ActorLineage>* curLineage() {
return currentLineage;
}
class Packer : public msgpack::packer<msgpack::sbuffer> { class Packer : public msgpack::packer<msgpack::sbuffer> {
struct visitor_t { struct visitor_t {
using VisitorMap = std::unordered_map<std::type_index, std::function<void(std::any const&, Packer& packer)>>; using VisitorMap = std::unordered_map<std::type_index, std::function<void(std::any const&, Packer& packer)>>;
@ -244,8 +249,15 @@ std::vector<std::shared_ptr<Sample>> SampleCollection_t::get(double from /*= 0.0
return res; return res;
} }
void sample(const Reference<ActorLineage>& lineage) { // void sample(const Reference<ActorLineage>& lineage) {
boost::asio::post(ActorLineageProfiler::instance().context(), [lineage]() { void sample(Reference<ActorLineage>* ptr) {
// boost::asio::post(ActorLineageProfiler::instance().context(), [lineage]() {
// SampleCollection::instance().collect(lineage);
// });
if (!ptr->isValid()) {
return;
}
boost::asio::post(ActorLineageProfiler::instance().context(), [lineage = Reference<ActorLineage>::addRef(ptr->getPtr())]() {
SampleCollection::instance().collect(lineage); SampleCollection::instance().collect(lineage);
}); });
} }

View File

@ -34,6 +34,9 @@
void samplingProfilerUpdateFrequency(std::optional<std::any> freq); void samplingProfilerUpdateFrequency(std::optional<std::any> freq);
void samplingProfilerUpdateWindow(std::optional<std::any> window); void samplingProfilerUpdateWindow(std::optional<std::any> window);
// TODO: For debugging, remove
LineageReference<ActorLineage>* curLineage();
struct IALPCollectorBase { struct IALPCollectorBase {
virtual std::optional<std::any> collect(ActorLineage*) = 0; virtual std::optional<std::any> collect(ActorLineage*) = 0;
virtual const std::string_view& name() = 0; virtual const std::string_view& name() = 0;

View File

@ -21,7 +21,7 @@
#include "fdbclient/StackLineage.h" #include "fdbclient/StackLineage.h"
std::vector<StringRef> getActorStackTrace() { std::vector<StringRef> getActorStackTrace() {
return currentLineage->stack(&StackLineage::actorName); return (*currentLineage)->stack(&StackLineage::actorName);
} }
namespace { namespace {

View File

@ -194,4 +194,14 @@ bool operator!=(const Reference<P>& lhs, const Reference<P>& rhs) {
return !(lhs == rhs); return !(lhs == rhs);
} }
template <class P>
class LineageReference : public Reference<P> {
// TODO: Make private
public:
LineageReference() : Reference<P>(nullptr), referencesSelf(false) {}
explicit LineageReference(P* ptr) : Reference<P>(ptr), referencesSelf(false) {}
LineageReference(const LineageReference& r) : Reference<P>(r), referencesSelf(false) {}
bool referencesSelf;
};
#endif #endif

View File

@ -1435,7 +1435,9 @@ void Net2::run() {
checkForSlowTask(tscBegin, timestampCounter(), taskEnd - taskBegin, TaskPriority::RunCycleFunction); checkForSlowTask(tscBegin, timestampCounter(), taskEnd - taskBegin, TaskPriority::RunCycleFunction);
} }
replaceLineage(Reference<ActorLineage>()); // replaceLineage(LineageReference<ActorLineage>());
// currentLineage = nullptr;
// replaceLineage(nullptr);
double sleepTime = 0; double sleepTime = 0;
bool b = ready.empty(); bool b = ready.empty();
if (b) { if (b) {

View File

@ -452,8 +452,8 @@ namespace actorcompiler
fullClassName, fullClassName,
string.Join(", ", actor.parameters.Select(p => p.name).ToArray())); string.Join(", ", actor.parameters.Select(p => p.name).ToArray()));
if (actor.IsCancellable()) // if (actor.IsCancellable())
writer.WriteLine("\trestore_lineage _;"); // writer.WriteLine("\trestore_lineage _;");
if (actor.returnType != null) if (actor.returnType != null)
writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType); writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType);
else else
@ -1288,6 +1288,8 @@ namespace actorcompiler
constructor.WriteLine("{"); constructor.WriteLine("{");
constructor.Indent(+1); constructor.Indent(+1);
ProbeEnter(constructor, actor.name); ProbeEnter(constructor, actor.name);
constructor.WriteLine("CurrentLineageReplace _(&this->lineage);");
// constructor.WriteLine("getCurrentLineage()->modify(&StackLineage::actorName) = LiteralStringRef(\"{0}\");", actor.name);
constructor.WriteLine("this->{0};", body.call()); constructor.WriteLine("this->{0};", body.call());
ProbeExit(constructor, actor.name); ProbeExit(constructor, actor.name);
WriteFunction(writer, constructor, constructor.BodyText); WriteFunction(writer, constructor, constructor.BodyText);

View File

@ -26,12 +26,17 @@
#include <stdarg.h> #include <stdarg.h>
#include <cinttypes> #include <cinttypes>
LineageReference<ActorLineage> rootLineage;
std::atomic<bool> startSampling = false; std::atomic<bool> startSampling = false;
thread_local Reference<ActorLineage> currentLineage; // TODO: Fix this (ideally get rid of allocation, otherwise memory leak?)
thread_local LineageReference<ActorLineage>* currentLineage = &rootLineage;//new LineageReference<ActorLineage>();
LineagePropertiesBase::~LineagePropertiesBase() {} LineagePropertiesBase::~LineagePropertiesBase() {}
ActorLineage::ActorLineage() : properties(), parent(currentLineage) {} ActorLineage::ActorLineage() : properties(), parent(*currentLineage) {
// TraceEvent("LUKAS_ActorLineage").detail("CurrentLineagePtr", reinterpret_cast<uintptr_t>(currentLineage)).detail("CurrentLineageRefPtr", reinterpret_cast<uintptr_t>(currentLineage->getPtr()));
}
ActorLineage::~ActorLineage() { ActorLineage::~ActorLineage() {
for (auto property : properties) { for (auto property : properties) {
@ -40,19 +45,53 @@ ActorLineage::~ActorLineage() {
} }
Reference<ActorLineage> getCurrentLineage() { Reference<ActorLineage> getCurrentLineage() {
if (!currentLineage.isValid() || currentLineage == currentLineage->getParent()) { // if (!currentLineage.isValid()/* || currentLineage == currentLineage->getParent()*/) {
replaceLineage(Reference<ActorLineage>{ new ActorLineage() }); if (!currentLineage->isValid() || !currentLineage->referencesSelf) {
// replaceLineage(LineageReference<ActorLineage>{ new ActorLineage() });
// *currentLineage = LineageReference<ActorLineage>{ new ActorLineage() };
// ActorLineage* lineage = new ActorLineage();
// *currentLineage = LineageReference<ActorLineage>::addRef(lineage);
// *currentLineage = LineageReference<ActorLineage>(new ActorLineage());
// currentLineage->referencesSelf = true;
// TraceEvent("LUKAS_getCurrent").detail("Ptr", reinterpret_cast<uintptr_t>(currentLineage->getPtr()));
currentLineage->setPtrUnsafe(new ActorLineage());
currentLineage->referencesSelf = true;
} }
return currentLineage; return *currentLineage;
} }
void sample(const Reference<ActorLineage>& lineage); // void sample(const Reference<ActorLineage>& lineage);
void sample(Reference<ActorLineage>* ptr);
void replaceLineage(Reference<ActorLineage> lineage) { void replaceLineage(LineageReference<ActorLineage>* lineage) {
// if (lineage.isValid()) {
// auto name = lineage->get(&StackLineage::actorName);
// if (name.has_value()) {
// TraceEvent("LUKAS_replaceLineage").detail("Name", name.value());
// }
// }
// if (currentLineage->isValid()) {
// auto name = (*currentLineage)->get(&StackLineage::actorName);
// if (name.has_value()) {
// TraceEvent("LUKAS_replaceLineageCurrentLineage").detail("Name", name.value());
// }
// }
// TraceEvent("LUKAS_replaceLineage").detail("Ptr", reinterpret_cast<uintptr_t>(lineage));
// TraceEvent("LUKAS_replaceLineage2").detail("IsValid", lineage->isValid()).detail("PtrPtr", reinterpret_cast<uintptr_t>(lineage->getPtr()));
if (!startSampling) { if (!startSampling) {
currentLineage = lineage; currentLineage = lineage;
} else { } else {
startSampling = false; startSampling = false;
// if (currentLineage->isValid()) {
// std::string stack = "";
// auto vec = (*currentLineage)->stack(&StackLineage::actorName);
// for (const auto& str : vec) {
// stack += std::string(reinterpret_cast<const char*>(str.begin()), str.size()) + " ";
// }
// TraceEvent("LUKAS_replaceLineage").detail("Stack", stack);
// }
sample(currentLineage); sample(currentLineage);
currentLineage = lineage; currentLineage = lineage;
} }

View File

@ -448,7 +448,7 @@ struct LineageProperties : LineagePropertiesBase {
} }
}; };
struct ActorLineage : ThreadUnsafeReferenceCounted<ActorLineage> { struct ActorLineage : ThreadSafeReferenceCounted<ActorLineage> {
friend class LocalLineage; friend class LocalLineage;
struct Property { struct Property {
@ -541,7 +541,8 @@ public:
}; };
extern std::atomic<bool> startSampling; extern std::atomic<bool> startSampling;
extern thread_local Reference<ActorLineage> currentLineage; // TODO: ThreadUnsafe?
extern thread_local LineageReference<ActorLineage>* currentLineage;
struct StackLineage : LineageProperties<StackLineage> { struct StackLineage : LineageProperties<StackLineage> {
static const std::string_view name; static const std::string_view name;
@ -549,25 +550,35 @@ struct StackLineage : LineageProperties<StackLineage> {
}; };
Reference<ActorLineage> getCurrentLineage(); Reference<ActorLineage> getCurrentLineage();
void replaceLineage(Reference<ActorLineage> lineage); void replaceLineage(LineageReference<ActorLineage>* lineage);
// This class can be used in order to modify all lineage properties // This class can be used in order to modify all lineage properties
// of actors created within a (non-actor) scope // of actors created within a (non-actor) scope
struct LocalLineage { struct LocalLineage {
Reference<ActorLineage> lineage = Reference<ActorLineage>{ new ActorLineage() }; LineageReference<ActorLineage> lineage = LineageReference<ActorLineage>{ new ActorLineage() };
Reference<ActorLineage> oldLineage; LineageReference<ActorLineage>* oldLineage;
LocalLineage() { LocalLineage() {
oldLineage = currentLineage; oldLineage = currentLineage;
replaceLineage(lineage); replaceLineage(&lineage);
} }
~LocalLineage() { ~LocalLineage() {
replaceLineage(oldLineage); replaceLineage(oldLineage);
} }
}; };
// TODO: No longer want this because we are now setting a global instead of just the field in a class
struct restore_lineage { struct restore_lineage {
Reference<ActorLineage> prev; // Reference<ActorLineage> lineage;
restore_lineage() : prev(currentLineage) {} LineageReference<ActorLineage>* prev;
// LineageReference<ActorLineage> prev;
// restore_lineage() : prev(*currentLineage) {
restore_lineage() : prev(currentLineage) {
// if (currentLineage != nullptr && currentLineage->isValid()) {
// prev = *currentLineage;
// }
// prev = currentLineage;
// replaceLineage(lineage);
}
~restore_lineage() { ~restore_lineage() {
replaceLineage(prev); replaceLineage(prev);
} }
@ -1148,53 +1159,104 @@ static inline void destruct(T& t) {
t.~T(); t.~T();
} }
// TODO: Rename, move to better spot (above)
struct CurrentLineageReplace {
LineageReference<ActorLineage>* oldLineage;
CurrentLineageReplace(LineageReference<ActorLineage>* with) : oldLineage(currentLineage) {
// currentLineage = with;
replaceLineage(with);
}
~CurrentLineageReplace() {
// currentLineage = oldLineage;
replaceLineage(oldLineage);
}
};
template <class ReturnValue> template <class ReturnValue>
struct Actor : SAV<ReturnValue> { struct Actor : SAV<ReturnValue> {
Reference<ActorLineage> lineage = currentLineage; // LineageReference<ActorLineage>* prev = currentLineage;
LineageReference<ActorLineage> lineage = *currentLineage;
// Reference<ActorLineage> lineage;
int8_t actor_wait_state; // -1 means actor is cancelled; 0 means actor is not waiting; 1-N mean waiting in callback int8_t actor_wait_state; // -1 means actor is cancelled; 0 means actor is not waiting; 1-N mean waiting in callback
// group # // group #
Actor() : SAV<ReturnValue>(1, 1), actor_wait_state(0) { Actor() : SAV<ReturnValue>(1, 1), actor_wait_state(0) {
/*++actorCount;*/ /*++actorCount;*/
replaceLineage(lineage); // if (currentLineage != nullptr && currentLineage->isValid()) {
// lineage = *currentLineage;
// }
// replaceLineage(&lineage);
// lineage.referencesSelf = false;
// currentLineage = &lineage;
}
~Actor() {
//--actorCount;
// replaceLineage(new LineageReference<ActorLineage>());
// replaceLineage(prev);
} }
//~Actor() { --actorCount; }
Reference<ActorLineage> setLineage() { Reference<ActorLineage> setLineage() {
auto res = currentLineage; Reference<ActorLineage> res = *currentLineage;
replaceLineage(lineage); // if (currentLineage != nullptr && currentLineage->isValid()) {
// res = *currentLineage;
// }
replaceLineage(&lineage);
return res; return res;
} }
LineageReference<ActorLineage>* lineageAddr() {
return std::addressof(lineage);
}
}; };
template <> template <>
struct Actor<void> { struct Actor<void> {
// This specialization is for a void actor (one not returning a future, hence also uncancellable) // This specialization is for a void actor (one not returning a future, hence also uncancellable)
Reference<ActorLineage> lineage = currentLineage; // LineageReference<ActorLineage>* prev = currentLineage;
LineageReference<ActorLineage> lineage = *currentLineage;
// Reference<ActorLineage> lineage;
int8_t actor_wait_state; // 0 means actor is not waiting; 1-N mean waiting in callback group # int8_t actor_wait_state; // 0 means actor is not waiting; 1-N mean waiting in callback group #
Actor() : actor_wait_state(0) { Actor() : actor_wait_state(0) {
/*++actorCount;*/ /*++actorCount;*/
replaceLineage(lineage); // if (currentLineage != nullptr && currentLineage->isValid()) {
// lineage = *currentLineage;
// }
// replaceLineage(&lineage);
// lineage.referencesSelf = false;
// currentLineage = &lineage;
}
~Actor() {
//--actorCount;
// replaceLineage(new LineageReference<ActorLineage>());
// replaceLineage(prev);
} }
//~Actor() { --actorCount; }
Reference<ActorLineage> setLineage() { Reference<ActorLineage> setLineage() {
auto res = currentLineage; Reference<ActorLineage> res = *currentLineage;
replaceLineage(lineage); // if (currentLineage != nullptr && currentLineage->isValid()) {
// res = *currentLineage;
// }
replaceLineage(&lineage);
return res; return res;
} }
LineageReference<ActorLineage>* lineageAddr() {
return std::addressof(lineage);
}
}; };
template <class ActorType, int CallbackNumber, class ValueType> template <class ActorType, int CallbackNumber, class ValueType>
struct ActorCallback : Callback<ValueType> { struct ActorCallback : Callback<ValueType> {
virtual void fire(ValueType const& value) override { virtual void fire(ValueType const& value) override {
auto _ = static_cast<ActorType*>(this)->setLineage(); // auto _ = static_cast<ActorType*>(this)->setLineage();
CurrentLineageReplace _(static_cast<ActorType*>(this)->lineageAddr());
static_cast<ActorType*>(this)->a_callback_fire(this, value); static_cast<ActorType*>(this)->a_callback_fire(this, value);
} }
virtual void error(Error e) override { virtual void error(Error e) override {
auto _ = static_cast<ActorType*>(this)->setLineage(); // auto _ = static_cast<ActorType*>(this)->setLineage();
CurrentLineageReplace _(static_cast<ActorType*>(this)->lineageAddr());
static_cast<ActorType*>(this)->a_callback_error(this, e); static_cast<ActorType*>(this)->a_callback_error(this, e);
} }
}; };
@ -1202,15 +1264,18 @@ struct ActorCallback : Callback<ValueType> {
template <class ActorType, int CallbackNumber, class ValueType> template <class ActorType, int CallbackNumber, class ValueType>
struct ActorSingleCallback : SingleCallback<ValueType> { struct ActorSingleCallback : SingleCallback<ValueType> {
void fire(ValueType const& value) override { void fire(ValueType const& value) override {
auto _ = static_cast<ActorType*>(this)->setLineage(); // auto _ = static_cast<ActorType*>(this)->setLineage();
CurrentLineageReplace _(static_cast<ActorType*>(this)->lineageAddr());
static_cast<ActorType*>(this)->a_callback_fire(this, value); static_cast<ActorType*>(this)->a_callback_fire(this, value);
} }
void fire(ValueType&& value) override { void fire(ValueType&& value) override {
auto _ = static_cast<ActorType*>(this)->setLineage(); // auto _ = static_cast<ActorType*>(this)->setLineage();
CurrentLineageReplace _(static_cast<ActorType*>(this)->lineageAddr());
static_cast<ActorType*>(this)->a_callback_fire(this, std::move(value)); static_cast<ActorType*>(this)->a_callback_fire(this, std::move(value));
} }
void error(Error e) override { void error(Error e) override {
auto _ = static_cast<ActorType*>(this)->setLineage(); // auto _ = static_cast<ActorType*>(this)->setLineage();
CurrentLineageReplace _(static_cast<ActorType*>(this)->lineageAddr());
static_cast<ActorType*>(this)->a_callback_error(this, e); static_cast<ActorType*>(this)->a_callback_error(this, e);
} }
}; };

View File

@ -1566,6 +1566,10 @@ struct YieldedFutureActor : SAV<Void>, ActorCallback<YieldedFutureActor, 1, Void
void destroy() override { delete this; } void destroy() override { delete this; }
Reference<ActorLineage> setLineage() { Reference<ActorLineage> setLineage() {
return *currentLineage;
}
LineageReference<ActorLineage>* lineageAddr() {
return currentLineage; return currentLineage;
} }