Annotation framework and role lineage

This commit is contained in:
Markus Pilman 2020-12-09 10:19:59 -07:00
parent 2c4e38329e
commit 0d324cee80
7 changed files with 142 additions and 14 deletions

View File

@ -22,6 +22,8 @@ set(FDBRPC_SRCS
ReplicationPolicy.cpp ReplicationPolicy.cpp
ReplicationTypes.cpp ReplicationTypes.cpp
ReplicationUtils.cpp ReplicationUtils.cpp
RoleLineage.h
RoleLineage.cpp
Stats.actor.cpp Stats.actor.cpp
Stats.h Stats.h
sim2.actor.cpp sim2.actor.cpp

View File

@ -63,6 +63,7 @@ struct ProcessClass {
Ratekeeper, Ratekeeper,
StorageCache, StorageCache,
Backup, Backup,
Worker, // used for actor lineage tracking
NoRole NoRole
}; };
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 }; enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };

23
fdbrpc/RoleLineage.cpp Normal file
View File

@ -0,0 +1,23 @@
/*
* RoleLineage.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbrpc/RoleLineage.h"
StringRef RoleLineage::name = "RoleLineage"_sr;

31
fdbrpc/RoleLineage.h Normal file
View File

@ -0,0 +1,31 @@
/*
* RoleLineage.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbrpc/Locality.h"
struct RoleLineage : LineageProperties<RoleLineage> {
static StringRef name;
ProcessClass::ClusterRole role = ProcessClass::NoRole;
bool isSet(ProcessClass::ClusterRole RoleLineage::*member) {
return this->*member != ProcessClass::NoRole;
}
};

View File

@ -22,6 +22,7 @@
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include "fdbrpc/Locality.h" #include "fdbrpc/Locality.h"
#include "fdbrpc/RoleLineage.h"
#include "fdbclient/StorageServerInterface.h" #include "fdbclient/StorageServerInterface.h"
#include "fdbserver/Knobs.h" #include "fdbserver/Knobs.h"
#include "flow/ActorCollection.h" #include "flow/ActorCollection.h"
@ -46,6 +47,7 @@
#include "flow/Profiler.h" #include "flow/Profiler.h"
#include "flow/ThreadHelper.actor.h" #include "flow/ThreadHelper.actor.h"
#include "flow/Trace.h" #include "flow/Trace.h"
#include "flow/flow.h"
#ifdef __linux__ #ifdef __linux__
#include <fcntl.h> #include <fcntl.h>
@ -1810,6 +1812,7 @@ ACTOR Future<Void> fdbd(
{ {
state vector<Future<Void>> actors; state vector<Future<Void>> actors;
state Promise<Void> recoveredDiskFiles; state Promise<Void> recoveredDiskFiles;
currentLineage->modify(&RoleLineage::role) = ProcessClass::Worker;
try { try {
ServerCoordinators coordinators( connFile ); ServerCoordinators coordinators( connFile );

View File

@ -31,6 +31,12 @@ thread_local Reference<ActorLineage> currentLineage;
ActorLineage::ActorLineage() : parent(currentLineage) { ActorLineage::ActorLineage() : parent(currentLineage) {
} }
ActorLineage::~ActorLineage() {
for (auto ptr : properties) {
delete ptr.second;
}
}
#if (defined(__linux__) || defined(__FreeBSD__)) && defined(__AVX__) && !defined(MEMORY_SANITIZER) #if (defined(__linux__) || defined(__FreeBSD__)) && defined(__AVX__) && !defined(MEMORY_SANITIZER)
// For benchmarking; need a version of rte_memcpy that doesn't live in the same compilation unit as the test. // For benchmarking; need a version of rte_memcpy that doesn't live in the same compilation unit as the test.
void * rte_memcpy_noinline(void *__restrict __dest, const void *__restrict __src, size_t __n) { void * rte_memcpy_noinline(void *__restrict __dest, const void *__restrict __src, size_t __n) {

View File

@ -20,6 +20,7 @@
#ifndef FLOW_FLOW_H #ifndef FLOW_FLOW_H
#define FLOW_FLOW_H #define FLOW_FLOW_H
#include "flow/Arena.h"
#include "flow/FastRef.h" #include "flow/FastRef.h"
#pragma once #pragma once
@ -29,6 +30,7 @@
#include <vector> #include <vector>
#include <queue> #include <queue>
#include <stack>
#include <map> #include <map>
#include <unordered_map> #include <unordered_map>
#include <set> #include <set>
@ -409,21 +411,88 @@ struct SingleCallback {
} }
}; };
struct ActorLineagePropertyMap : ReferenceCounted<ActorLineagePropertyMap> { struct LineagePropertiesBase {
};
// helper class to make implementation of LineageProperties easier
template<class Derived>
struct LineageProperties : LineagePropertiesBase {
// Contract:
//
// StringRef name = "SomeUniqueName"_str;
// this has to be implemented by subclasses
// but can't be made virtual.
// A user should implement this for any type
// within the properies class.
template<class Value>
bool isSet(Value Derived::*member) {
return true;
}
}; };
struct ActorLineage : ReferenceCounted<ActorLineage> { struct ActorLineage : ReferenceCounted<ActorLineage> {
Reference<ActorLineagePropertyMap> map; private:
std::unordered_map<StringRef, LineagePropertiesBase*> properties;
Reference<ActorLineage> parent; Reference<ActorLineage> parent;
public:
ActorLineage(); ActorLineage();
~ActorLineage();
bool isRoot() const {
return parent.getPtr() == nullptr;
}
void makeRoot() {
parent.clear();
}
template <class T, class V>
V& modify(V T::*member) {
auto& res = properties[T::name];
if (!res) {
res = new T{};
}
T* map = static_cast<T*>(res);
return map->*member;
}
template <class T, class V>
std::optional<V> get(V T::*member) const {
auto current = this;
while (current != nullptr) {
auto iter = current->properties.find(T::name);
if (iter != current->properties.end()) {
T const& map = static_cast<T const&>(*iter->second);
if (map.isSet(member)) {
return map.*member;
}
}
current = current->parent.getPtr();
}
return std::optional<V>{};
}
template <class T, class V>
std::stack<V> stack(V T::*member) const {
auto current = this;
std::stack<V> res;
while (current != nullptr) {
auto iter = current->properties.find(T::name);
if (iter != current->properties.end()) {
T const& map = static_cast<T const&>(*iter->second);
if (map.isSet(member)) {
res.push(map.*member);
}
}
current = current->parent.getPtr();
}
return res;
}
}; };
extern thread_local Reference<ActorLineage> currentLineage; extern thread_local Reference<ActorLineage> currentLineage;
struct restore_lineage { struct restore_lineage {
Reference<ActorLineage> lineage; Reference<ActorLineage> prev;
restore_lineage() : lineage(currentLineage) {} restore_lineage() : prev(currentLineage) {}
~restore_lineage() { currentLineage = lineage; } ~restore_lineage() { currentLineage = prev; }
}; };
// SAV is short for Single Assignment Variable: It can be assigned for only once! // SAV is short for Single Assignment Variable: It can be assigned for only once!
@ -465,7 +534,6 @@ public:
ASSERT(canBeSet()); ASSERT(canBeSet());
new (&value_storage) T(std::forward<U>(value)); new (&value_storage) T(std::forward<U>(value));
this->error_state = Error::fromCode(SET_ERROR_CODE); this->error_state = Error::fromCode(SET_ERROR_CODE);
restore_lineage _;
while (Callback<T>::next != this) { while (Callback<T>::next != this) {
Callback<T>::next->fire(this->value()); Callback<T>::next->fire(this->value());
} }
@ -479,7 +547,6 @@ public:
void sendError(Error err) { void sendError(Error err) {
ASSERT(canBeSet() && int16_t(err.code()) > 0); ASSERT(canBeSet() && int16_t(err.code()) > 0);
this->error_state = err; this->error_state = err;
restore_lineage _;
while (Callback<T>::next != this) { while (Callback<T>::next != this) {
Callback<T>::next->error(err); Callback<T>::next->error(err);
} }
@ -487,7 +554,6 @@ public:
template <class U> template <class U>
void sendAndDelPromiseRef(U && value) { void sendAndDelPromiseRef(U && value) {
restore_lineage _;
ASSERT(canBeSet()); ASSERT(canBeSet());
if (promises == 1 && !futures) { if (promises == 1 && !futures) {
// No one is left to receive the value, so we can just die // No one is left to receive the value, so we can just die
@ -501,7 +567,6 @@ public:
void finishSendAndDelPromiseRef() { void finishSendAndDelPromiseRef() {
// Call only after value_storage has already been initialized! // Call only after value_storage has already been initialized!
restore_lineage _;
this->error_state = Error::fromCode(SET_ERROR_CODE); this->error_state = Error::fromCode(SET_ERROR_CODE);
while (Callback<T>::next != this) while (Callback<T>::next != this)
Callback<T>::next->fire(this->value()); Callback<T>::next->fire(this->value());
@ -518,7 +583,6 @@ public:
} }
void sendErrorAndDelPromiseRef(Error err) { void sendErrorAndDelPromiseRef(Error err) {
restore_lineage _;
ASSERT(canBeSet() && int16_t(err.code()) > 0); ASSERT(canBeSet() && int16_t(err.code()) > 0);
if (promises == 1 && !futures) { if (promises == 1 && !futures) {
// No one is left to receive the value, so we can just die // No one is left to receive the value, so we can just die
@ -622,7 +686,6 @@ struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>
if (error.isValid()) return; if (error.isValid()) return;
if (SingleCallback<T>::next != this) { if (SingleCallback<T>::next != this) {
restore_lineage _;
SingleCallback<T>::next->fire(std::forward<U>(value)); SingleCallback<T>::next->fire(std::forward<U>(value));
} }
else { else {
@ -635,7 +698,6 @@ struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>
this->error = err; this->error = err;
if (SingleCallback<T>::next != this) { if (SingleCallback<T>::next != this) {
restore_lineage _;
SingleCallback<T>::next->error(err); SingleCallback<T>::next->error(err);
} }
} }
@ -1025,13 +1087,13 @@ struct Actor : SAV<ReturnValue> {
/*++actorCount;*/ /*++actorCount;*/
currentLineage = lineage; currentLineage = lineage;
} }
//~Actor() { --actorCount; }
Reference<ActorLineage> setLineage() { Reference<ActorLineage> setLineage() {
auto res = currentLineage; auto res = currentLineage;
currentLineage = lineage; currentLineage = lineage;
return res; return res;
} }
//~Actor() { --actorCount; }
}; };
template <> template <>
@ -1045,13 +1107,13 @@ struct Actor<void> {
/*++actorCount;*/ /*++actorCount;*/
currentLineage = lineage; currentLineage = lineage;
} }
//~Actor() { --actorCount; }
Reference<ActorLineage> setLineage() { Reference<ActorLineage> setLineage() {
auto res = currentLineage; auto res = currentLineage;
currentLineage = lineage; currentLineage = lineage;
return res; return res;
} }
//~Actor() { --actorCount; }
}; };
template <class ActorType, int CallbackNumber, class ValueType> template <class ActorType, int CallbackNumber, class ValueType>