Provide actor call backtrace

See design/AcAC.md
This commit is contained in:
Xiaoge Su 2023-06-20 16:26:48 -07:00
parent 3c5a38b120
commit 91ec1fdf10
33 changed files with 794 additions and 138 deletions

View File

@ -53,6 +53,15 @@ endif()
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)
set(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/lib)
option(WITH_ACAC "Enable actor stack recording" OFF)
if (WITH_ACAC)
message(STATUS "Build FoundationDB with AcAC support")
if (FDB_RELEASE OR FDB_RELEASE_CANDITATE)
message(FATAL_ERROR "ACAC will cause severe slowdown of the system and SHOULD not be enabled in Release.")
endif()
add_compile_definitions(WITH_ACAC)
endif()
################################################################################
# Packages used for bindings
################################################################################
@ -242,3 +251,5 @@ endif()
print_components()
message(STATUS "CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL}")

View File

@ -9,7 +9,7 @@ function(compile_boost)
# Configure bootstrap command
set(BOOTSTRAP_COMMAND "./bootstrap.sh")
set(BOOTSTRAP_LIBRARIES "context,filesystem,iostreams,system,serialization")
set(BOOTSTRAP_LIBRARIES "context,filesystem,iostreams,system,serialization,program_options")
set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}")
# Can't build Boost with Intel compiler, use clang instead.
@ -88,12 +88,17 @@ function(compile_boost)
"${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a"
"${BOOST_INSTALL_DIR}/lib/libboost_iostreams.a"
"${BOOST_INSTALL_DIR}/lib/libboost_serialization.a"
"${BOOST_INSTALL_DIR}/lib/libboost_system.a")
"${BOOST_INSTALL_DIR}/lib/libboost_system.a"
"${BOOST_INSTALL_DIR}/lib/libboost_program_options.a")
add_library(${COMPILE_BOOST_TARGET}_context STATIC IMPORTED)
add_dependencies(${COMPILE_BOOST_TARGET}_context ${COMPILE_BOOST_TARGET}Project)
set_target_properties(${COMPILE_BOOST_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${COMPILE_BOOST_TARGET}_program_options STATIC IMPORTED)
add_dependencies(${COMPILE_BOOST_TARGET}_program_options ${COMPILE_BOOST_TARGET}Project)
set_target_properties(${COMPILE_BOOST_TARGET}_program_options PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_program_options.a")
add_library(${COMPILE_BOOST_TARGET}_filesystem STATIC IMPORTED)
add_dependencies(${COMPILE_BOOST_TARGET}_filesystem ${COMPILE_BOOST_TARGET}Project)
set_target_properties(${COMPILE_BOOST_TARGET}_filesystem PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a")
@ -138,11 +143,11 @@ set(Boost_USE_STATIC_LIBS ON)
if (UNIX AND CMAKE_CXX_COMPILER_ID MATCHES "Clang$" AND USE_LIBCXX)
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_78_0_clang)
set(BOOST_HINT_PATHS /opt/boost_1_78_0_clang)
message(STATUS "Using Clang version of boost::context boost::filesystem and boost::iostreams")
message(STATUS "Using Clang version of boost")
else ()
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_78_0)
set(BOOST_HINT_PATHS /opt/boost_1_78_0)
message(STATUS "Using g++ version of boost::context boost::filesystem and boost::iostreams")
message(STATUS "Using g++ version of boost")
endif ()
if(BOOST_ROOT)
@ -154,13 +159,16 @@ if(WIN32)
# properly for config mode. So we use the old way on Windows
# find_package(Boost 1.72.0 EXACT QUIET REQUIRED CONFIG PATHS ${BOOST_HINT_PATHS})
# I think depending on the cmake version this will cause weird warnings
find_package(Boost 1.78 COMPONENTS filesystem iostreams serialization system)
find_package(Boost 1.78 COMPONENTS filesystem iostreams serialization system program_options)
add_library(boost_target INTERFACE)
target_link_libraries(boost_target INTERFACE Boost::boost Boost::filesystem Boost::iostreams Boost::serialization Boost::system)
add_library(boost_target_program_options INTERFACE)
target_link_libraries(boost_target_program_options INTERFACE Boost::boost Boost::program_options)
return()
endif()
find_package(Boost 1.78.0 EXACT QUIET COMPONENTS context filesystem iostreams serialization system CONFIG PATHS ${BOOST_HINT_PATHS})
find_package(Boost 1.78.0 EXACT QUIET COMPONENTS context filesystem iostreams program_options serialization system CONFIG PATHS ${BOOST_HINT_PATHS})
set(FORCE_BOOST_BUILD OFF CACHE BOOL "Forces cmake to build boost and ignores any installed boost")
# The precompiled boost silently broke in CI. While investigating, I considered extending
@ -179,6 +187,9 @@ set(FORCE_BOOST_BUILD OFF CACHE BOOL "Forces cmake to build boost and ignores an
if(Boost_FOUND AND NOT FORCE_BOOST_BUILD)
add_library(boost_target INTERFACE)
target_link_libraries(boost_target INTERFACE Boost::boost Boost::context Boost::filesystem Boost::iostreams Boost::serialization Boost::system)
add_library(boost_target_program_options INTERFACE)
target_link_libraries(boost_target_program_options INTERFACE Boost::boost Boost::program_options)
elseif(WIN32)
message(FATAL_ERROR "Could not find Boost")
else()

View File

@ -60,6 +60,8 @@ public:
std::shared_ptr<AzureClient> client;
public:
virtual StringRef getClassName() override { return "BackupContainerAzureBlobStoreImpl::ReadFile"_sr; }
ReadFile(AsyncTaskThread& asyncTaskThread,
const std::string& containerName,
const std::string& blobName,
@ -121,6 +123,8 @@ public:
static constexpr size_t bufferLimit = 1 << 20;
public:
virtual StringRef getClassName() override { return "BackupContainerAzureBlobStoreImpl::WriteFile"_sr; }
WriteFile(AsyncTaskThread& asyncTaskThread,
const std::string& containerName,
const std::string& blobName,

View File

@ -60,6 +60,8 @@ public:
void addref() override { ReferenceCounted<AsyncFileS3BlobStoreWrite>::addref(); }
void delref() override { ReferenceCounted<AsyncFileS3BlobStoreWrite>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileS3BlobStoreWrite"_sr; }
struct Part : ReferenceCounted<Part> {
Part(int n, int minSize)
: number(n), writer(content.getWriteBuffer(minSize), nullptr, Unversioned()), length(0) {
@ -266,6 +268,8 @@ public:
void addref() override { ReferenceCounted<AsyncFileS3BlobStoreRead>::addref(); }
void delref() override { ReferenceCounted<AsyncFileS3BlobStoreRead>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileS3BlobStoreRead"_sr; }
Future<int> read(void* data, int length, int64_t offset) override;
Future<Void> write(void const* data, int length, int64_t offset) override { throw file_not_writable(); }

View File

@ -140,6 +140,8 @@ class AsyncFileCached final : public IAsyncFile, public ReferenceCounted<AsyncFi
friend struct AFCPage;
public:
virtual StringRef getClassName() override { return "AsyncFileCached"_sr; }
// Opens a file that uses the FDB in-memory page cache
static Future<Reference<IAsyncFile>> open(std::string filename, int flags, int mode) {
//TraceEvent("AsyncFileCachedOpen").detail("Filename", filename);

View File

@ -44,6 +44,8 @@ public:
void addref() override { ReferenceCounted<AsyncFileChaos>::addref(); }
void delref() override { ReferenceCounted<AsyncFileChaos>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileReadAheadCache"_sr; }
double getDelay() const {
double delayFor = 0.0;
if (!enabled)

View File

@ -45,6 +45,8 @@
class AsyncFileEIO : public IAsyncFile, public ReferenceCounted<AsyncFileEIO> {
public:
virtual StringRef getClassName() override { return "AsyncFileDetachable"_sr; }
static void init() {
eio_set_max_parallel(FLOW_KNOBS->EIO_MAX_PARALLELISM);
if (eio_init(&eio_want_poll, nullptr)) {

View File

@ -35,6 +35,8 @@ class AsyncFileEncrypted : public IAsyncFile, public ReferenceCounted<AsyncFileE
public:
enum class Mode { APPEND_ONLY, READ_ONLY };
virtual StringRef getClassName() override { return "AsyncFileEncrypted"_sr; }
private:
Reference<IAsyncFile> file;
StreamCipher::IV firstBlockIV;

View File

@ -58,6 +58,8 @@ DESCR struct SlowAioSubmit {
class AsyncFileKAIO final : public IAsyncFile, public ReferenceCounted<AsyncFileKAIO> {
public:
virtual StringRef getClassName() override { return "AsyncFileKAIO"_sr; }
struct AsyncFileKAIOMetrics {
LatencySample readLatencySample = { "AsyncFileKAIOReadLatency",
UID(),

View File

@ -71,6 +71,8 @@ private:
bool assertOnReadWriteCancel;
public:
virtual StringRef getClassName() override { return "AsyncFileDetachable"_sr; }
explicit AsyncFileDetachable(Reference<IAsyncFile> file) : file(file), assertOnReadWriteCancel(true) {
shutdown = doShutdown(this);
}
@ -103,6 +105,8 @@ public:
// killed This is used to simulate a power failure which prevents all written data from being persisted to disk
class AsyncFileNonDurable final : public IAsyncFile, public ReferenceCounted<AsyncFileNonDurable> {
public:
virtual StringRef getClassName() override { return "AsyncFileNonDurable"_sr; }
UID id;
std::string filename;

View File

@ -39,6 +39,8 @@ public:
void addref() override { ReferenceCounted<AsyncFileReadAheadCache>::addref(); }
void delref() override { ReferenceCounted<AsyncFileReadAheadCache>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileReadAheadCache"_sr; }
struct CacheBlock : ReferenceCounted<CacheBlock> {
CacheBlock(int size = 0) : data(new uint8_t[size]), len(size) {}
~CacheBlock() { delete[] data; }

View File

@ -43,6 +43,8 @@ public:
static void stop() {}
virtual StringRef getClassName() override { return "AsnycFileWinASIO"_sr; }
static bool should_poll() { return false; }
// FIXME: This implementation isn't actually asynchronous - it just does operations synchronously!

View File

@ -30,6 +30,8 @@ public:
void addref() override { ReferenceCounted<AsyncFileWriteChecker>::addref(); }
void delref() override { ReferenceCounted<AsyncFileWriteChecker>::delref(); }
virtual StringRef getClassName() override { return "AsyncFileWriteChecker"_sr; }
// For read() and write(), the data buffer must remain valid until the future is ready
Future<int> read(void* data, int length, int64_t offset) override {
// Lambda must hold a reference to this to keep it alive until after the read

View File

@ -656,6 +656,8 @@ class SimpleFile : public IAsyncFile, public ReferenceCounted<SimpleFile> {
public:
static void init() {}
virtual StringRef getClassName() override { return "SimpleFile"_sr; }
static bool should_poll() { return false; }
ACTOR static Future<Reference<IAsyncFile>> open(
@ -2905,6 +2907,7 @@ Future<Reference<IUDPSocket>> Sim2::createUDPSocket(bool isV6) {
void startNewSimulator(bool printSimTime) {
ASSERT(!g_network);
ASSERT(!g_simulator);
g_network = g_simulator = new Sim2(printSimTime);
g_simulator->connectionFailuresDisableDuration =
deterministicRandom()->coinflip() ? 0 : DISABLE_CONNECTION_FAILURE_FOREVER;

201
flow/ActorContext.cpp Normal file
View File

@ -0,0 +1,201 @@
#include "flow/ActorContext.h"
#ifdef WITH_ACAC
#include <iomanip>
#include <mutex>
#include "flow/flow.h"
#include "libb64/encode.h"
#include "libb64/decode.h"
namespace {
std::vector<ActorExecutionContext> g_currentExecutionContext;
std::unordered_map<ActorID, ActiveActor> g_activeActors;
ActorID getActorID() {
static thread_local ActorID actorID = INIT_ACTOR_ID;
return ++actorID;
}
inline ActorID getActorSpawnerID() {
if (g_currentExecutionContext.empty()) {
return INIT_ACTOR_ID;
}
return g_currentExecutionContext.back().actorID;
}
inline bool isActorOnMainThread() {
// The INetwork framework behaves differently in Net2 and Sim2.
// For Net2, when Net2::run() is called, the N2::thread_network is set to be the current Net2 instance.
// For Sim2, it tests if on main thread by calling the underlying Net2 instance, however, since Net2::run()
// is never called, the N2::thread_network will always be nullptr. In this case, Sim2::isOnMainThread will always
// return false and not reliable.
if (g_network) [[likely]] {
return g_network->isSimulated() ? true : g_network->isOnMainThread();
} else {
return false;
}
}
} // anonymous namespace
using ActiveActorsCount_t = uint32_t;
ActiveActor::ActiveActor() : identifier(), id(), spawnTime(0.0), spawner(INVALID_ACTOR_ID) {}
ActiveActor::ActiveActor(const ActorIdentifier& identifier_, const ActorID& id_, const ActorID& spawnerID_)
: identifier(identifier_), id(id_), spawnTime(g_network != nullptr ? g_network->now() : 0.0), spawner(spawnerID_) {}
ActiveActorHelper::ActiveActorHelper(const ActorIdentifier& actorIdentifier) {
if (!isActorOnMainThread()) [[unlikely]] {
return;
}
const auto actorID_ = getActorID();
const auto spawnerActorID = getActorSpawnerID();
actorID = actorID_;
g_activeActors[actorID] = ActiveActor(actorIdentifier, actorID, spawnerActorID);
}
ActiveActorHelper::~ActiveActorHelper() {
if (!isActorOnMainThread()) [[unlikely]] {
return;
}
g_activeActors.erase(actorID);
}
ActorExecutionContextHelper::ActorExecutionContextHelper(const ActorID& actorID_,
const ActorBlockIdentifier& blockIdentifier_) {
if (!isActorOnMainThread()) [[unlikely]] {
return;
}
g_currentExecutionContext.emplace_back(actorID_, blockIdentifier_);
}
ActorExecutionContextHelper::~ActorExecutionContextHelper() {
if (!isActorOnMainThread()) [[unlikely]] {
return;
}
if (g_currentExecutionContext.empty()) [[unlikely]] {
// This should not happen, abort the program if it happens.
std::abort();
}
g_currentExecutionContext.pop_back();
}
// TODO: Rewrite this function for better display
void dumpActors(std::ostream& stream) {
stream << "Current active ACTORs:" << std::endl;
for (const auto& [actorID, activeActor] : g_activeActors) {
stream << std::setw(10) << actorID << " " << activeActor.identifier.toString() << std::endl;
if (activeActor.spawner != INVALID_ACTOR_ID) {
stream << " Spawn by " << std::setw(10) << activeActor.spawner << std::endl;
}
}
}
namespace {
std::vector<ActiveActor> getCallBacktraceOfActor(const ActorID& actorID) {
std::vector<ActiveActor> actorBacktrace;
auto currentActorID = actorID;
for (;;) {
if (currentActorID == INIT_ACTOR_ID) {
// Reaching the root
break;
}
if (g_activeActors.count(currentActorID) == 0) {
// TODO: Understand why this happens and react properly
break;
}
actorBacktrace.push_back(g_activeActors.at(currentActorID));
if (g_activeActors.at(currentActorID).spawner != INVALID_ACTOR_ID) {
currentActorID = g_activeActors.at(currentActorID).spawner;
} else {
// TODO: Understand why the actor has no spawner ID
break;
}
}
return actorBacktrace;
}
} // anonymous namespace
std::string encodeActorContext(const ActorContextDumpType dumpType) {
BinaryWriter writer(Unversioned());
auto writeActorInfo = [&writer](const ActiveActor& actor) {
writer << actor.id << actor.identifier << actor.spawner;
};
writer << static_cast<uint8_t>(dumpType)
<< (g_currentExecutionContext.empty() ? INVALID_ACTOR_ID : g_currentExecutionContext.back().actorID);
switch (dumpType) {
case ActorContextDumpType::FULL_CONTEXT:
writer << static_cast<ActiveActorsCount_t>(g_activeActors.size());
for (const auto& [actorID, activeActor] : g_activeActors) {
writeActorInfo(activeActor);
}
break;
case ActorContextDumpType::CURRENT_STACK:
// Only current call stack
{
if (g_currentExecutionContext.empty()) {
writer << static_cast<ActiveActorsCount_t>(0);
break;
}
writer << static_cast<ActiveActorsCount_t>(g_currentExecutionContext.size());
for (const auto& context : g_currentExecutionContext) {
writeActorInfo(g_activeActors.at(context.actorID));
}
}
break;
case ActorContextDumpType::CURRENT_CALL_BACKTRACE:
// The call backtrace of current active actor
{
if (g_currentExecutionContext.empty()) {
writer << static_cast<ActiveActorsCount_t>(0);
break;
}
const auto actors = getCallBacktraceOfActor(g_currentExecutionContext.back().actorID);
writer << static_cast<ActiveActorsCount_t>(actors.size());
for (const auto& item : actors) {
writeActorInfo(item);
}
}
break;
default:
UNREACHABLE();
}
const std::string data = writer.toValue().toString();
return base64::encoder::from_string(data);
}
DecodedActorContext decodeActorContext(const std::string& caller) {
DecodedActorContext result;
const auto decoded = base64::decoder::from_string(caller);
BinaryReader reader(decoded, Unversioned());
std::underlying_type_t<ActorContextDumpType> dumpTypeRaw;
reader >> dumpTypeRaw;
result.dumpType = static_cast<ActorContextDumpType>(dumpTypeRaw);
reader >> result.currentRunningActor;
ActiveActorsCount_t actorCount;
reader >> actorCount;
std::unordered_map<ActorID, std::tuple<ActorID, ActorIdentifier, ActorID>> actors;
for (ActiveActorsCount_t i = 0; i < actorCount; ++i) {
ActorID id;
ActorID spawner;
ActorIdentifier identifier;
reader >> id >> identifier >> spawner;
result.context.emplace_back(id, identifier, spawner);
}
return result;
}
#endif // WITH_ACAC

View File

@ -12,8 +12,10 @@ if (FLOW_USE_ZSTD)
endif()
# Remove files with `main` defined so we can create a link test executable.
list(REMOVE_ITEM FLOW_SRCS LinkTest.cpp)
list(REMOVE_ITEM FLOW_SRCS TLSTest.cpp)
list(REMOVE_ITEM FLOW_SRCS MkCertCli.cpp)
list(REMOVE_ITEM FLOW_SRCS acac.cpp)
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
list(APPEND FLOW_SRCS aarch64/memcmp.S aarch64/memcpy.S)
@ -71,7 +73,14 @@ if(APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
endif()
foreach(ft flow flow_sampling flowlinktest)
target_include_directories(${ft} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include")
target_include_directories(
${ft}
PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR}/include"
"${CMAKE_CURRENT_BINARY_DIR}/include"
"${CMAKE_SOURCE_DIR}/contrib/libb64/include"
)
if (FLOW_USE_ZSTD)
target_include_directories(${ft} PRIVATE SYSTEM ${ZSTD_LIB_INCLUDE_DIR})
endif()
@ -80,7 +89,7 @@ foreach(ft flow flow_sampling flowlinktest)
endif()
target_link_libraries(${ft} PRIVATE stacktrace)
target_link_libraries(${ft} PUBLIC fmt::fmt SimpleOpt crc32)
target_link_libraries(${ft} PUBLIC fmt::fmt SimpleOpt crc32 libb64)
if(UNIX AND NOT APPLE)
target_link_libraries(${ft} PRIVATE folly_memcpy)
target_compile_definitions(${ft} PRIVATE WITH_FOLLY_MEMCPY)
@ -105,12 +114,10 @@ foreach(ft flow flow_sampling flowlinktest)
if(USE_VALGRIND)
target_link_libraries(${ft} PUBLIC Valgrind)
endif()
target_link_libraries(${ft} PUBLIC OpenSSL::SSL)
target_link_libraries(${ft} PUBLIC Threads::Threads ${CMAKE_DL_LIBS})
target_link_libraries(${ft} PUBLIC boost_target)
if(USE_VALGRIND)
target_link_libraries(${ft} PUBLIC Valgrind)
endif()
if(APPLE)
find_library(IO_KIT IOKit)
@ -119,6 +126,9 @@ foreach(ft flow flow_sampling flowlinktest)
endif()
endforeach()
add_executable(acac acac.cpp)
target_link_libraries(acac PUBLIC flow boost_target_program_options)
target_compile_definitions(flow_sampling PRIVATE -DENABLE_SAMPLING)
if(WIN32)
add_dependencies(flow_sampling_actors flow_actors)
@ -232,3 +242,4 @@ if (WITH_SWIFT)
add_dependencies(flow flow_swift)
add_dependencies(flow_sampling flow_swift)
endif() # WITH SWIFT

View File

@ -101,10 +101,12 @@ Error internal_error_impl(const char* a_nm,
return Error(error_code_internal_error);
}
Error::Error() : error_code(invalid_error_code), flags(0) {}
Error::Error(int error_code) : error_code(error_code), flags(0) {
if (TRACE_SAMPLE())
TraceEvent(SevSample, "ErrorCreated").detail("ErrorCode", error_code);
// std::cout << "Error: " << error_code << std::endl;
if (error_code >= 3000 && error_code < 6000) {
{
TraceEvent te(SevError, "SystemError");

View File

@ -24,6 +24,7 @@
#include "flow/MkCert.h"
#include "flow/PKey.h"
#include "flow/ScopeExit.h"
#include "flow/Trace.h"
#include <limits>
#include <memory>

View File

@ -123,7 +123,7 @@ class Connection;
// Outlives main
Net2* g_net2 = nullptr;
thread_local INetwork* thread_network = 0;
thread_local INetwork* thread_network = nullptr;
class Net2 final : public INetwork, public INetworkConnections {

View File

@ -18,9 +18,12 @@
* limitations under the License.
*/
#include "flow/PKey.h"
#include "flow/AutoCPointer.h"
#include "flow/Error.h"
#include "flow/PKey.h"
#include "flow/Trace.h"
#include <openssl/bio.h>
#include <openssl/err.h>
#include <openssl/evp.h>

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include "flow/ActorContext.h"
#include "flow/Trace.h"
#include "flow/FileTraceLogWriter.h"
#include "flow/Knobs.h"
@ -1337,6 +1338,9 @@ void BaseTraceEvent::log() {
if (this->severity == SevError) {
severity = SevInfo;
backtrace();
#ifdef WITH_ACAC
detail("ActorStack", encodeActorContext(ActorContextDumpType::CURRENT_CALL_BACKTRACE));
#endif // WITH_ACAC
severity = SevError;
if (errorKindIndex != -1) {
fields.mutate(errorKindIndex).second = toString(errorKind);

169
flow/acac.cpp Normal file
View File

@ -0,0 +1,169 @@
#ifdef WITH_ACAC
#include <deque>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <boost/algorithm/string.hpp>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/variables_map.hpp>
#include <boost/program_options.hpp>
#include "flow/ActorContext.h"
std::unordered_map<UID, std::string> loadUIDActorMapping(const std::string& build_directory_path) {
std::unordered_map<UID, std::string> identifierToActor;
for (const auto& dirEntry : std::filesystem::recursive_directory_iterator(build_directory_path)) {
if (!dirEntry.is_regular_file()) {
continue;
}
if (dirEntry.path().extension() != ".uid") {
continue;
}
std::ifstream ifs(dirEntry.path());
for (;;) {
std::string line;
if (!std::getline(ifs, line)) {
break;
}
std::stringstream ss;
ss << line;
uint64_t part1, part2;
char pipeChar;
std::string actorName;
ss >> part1 >> pipeChar >> part2 >> pipeChar >> actorName;
identifierToActor[UID(part1, part2)] = actorName;
}
}
return identifierToActor;
}
void dumpActorContextTree(std::ostream& stream,
const DecodedActorContext& decoded,
const std::unordered_map<UID, std::string>& identifierToActor) {
std::unordered_map<ActorID, std::vector<ActorID>> spawnInfo;
for (const auto& [actorID, _1, parentID] : decoded.context) {
spawnInfo[parentID].push_back(actorID);
}
std::unordered_map<ActorID, std::string> actorNames;
for (const auto& [actorID, actorIdentifier, _1] : decoded.context) {
actorNames[actorID] = identifierToActor.at(actorIdentifier);
}
// 2-space indentation
constexpr int INDENT = 2;
std::deque<std::pair<ActorID, int>> actorQueue;
actorQueue.push_back({ INIT_ACTOR_ID, 0 });
while (!actorQueue.empty()) {
const auto [actorID, depth] = actorQueue.front();
actorQueue.pop_front();
if (spawnInfo.count(actorID)) {
for (const auto childID : spawnInfo.at(actorID)) {
actorQueue.push_front({ childID, depth + 1 });
}
}
if (actorID == 0) {
continue;
}
stream << std::string(INDENT * depth, ' ') << '(' << std::setw(12) << actorID << ") " << actorNames.at(actorID)
<< (actorID == decoded.currentRunningActor ? " <ACTIVE>" : "") << std::endl;
}
}
void dumpActorContextStack(std::ostream& stream,
const DecodedActorContext& decoded,
const std::unordered_map<UID, std::string>& identifierToActor) {
for (const auto& [actorID, actorIdentifier, spawnerActorID] : decoded.context) {
const std::string& actorName = identifierToActor.at(actorIdentifier);
stream << std::setw(12) << actorID << " " << actorName
<< (actorID == decoded.currentRunningActor ? " <ACTIVE>" : "") << std::endl;
}
}
void decodeClass(std::ostream& stream,
const std::string& classIdentifier,
const std::unordered_map<UID, std::string>& identifierToActor) {
UID uid = UID::fromString(classIdentifier);
stream << classIdentifier << " -- " << identifierToActor.at(uid) << std::endl;
}
namespace bpo = boost::program_options;
int main(int argc, char* argv[]) {
bpo::options_description desc("Options");
desc.add_options()("help",
"Print help message")("fdb-build-directory", bpo::value<std::string>(), "Build directory")(
"decode-class", bpo::value<std::string>(), "Decode a class key");
bpo::variables_map varMap;
bpo::store(bpo::parse_command_line(argc, argv, desc), varMap);
bpo::notify(varMap);
if (varMap.count("help")) {
std::cerr << desc << std::endl;
return 1;
}
std::string buildDirectory = ".";
if (varMap.count("fdb-build-directory") != 0) {
buildDirectory = varMap["fdb-build-directory"].as<std::string>();
}
const auto lib = loadUIDActorMapping(buildDirectory);
if (varMap.count("decode-class") != 0) {
decodeClass(std::cout, varMap["decode-class"].as<std::string>(), lib);
return 0;
}
std::string encodedContext;
while (std::cin) {
std::string line;
std::getline(std::cin, line);
boost::trim(line);
// libb64 will generate newline every 72 characters, which will be encoded as "\0xa"
// in the TraceEvent log file.
boost::replace_all(line, "\\x0a", "\n");
encodedContext += line;
}
const auto decodedActorContext = decodeActorContext(encodedContext);
switch (decodedActorContext.dumpType) {
case ActorContextDumpType::FULL_CONTEXT:
dumpActorContextTree(std::cout, decodedActorContext, lib);
break;
case ActorContextDumpType::CURRENT_STACK:
case ActorContextDumpType::CURRENT_CALL_BACKTRACE:
dumpActorContextStack(std::cout, decodedActorContext, lib);
break;
default:
std::cerr << "Unexpected ActorContextDumpType: " << static_cast<uint8_t>(decodedActorContext.dumpType)
<< std::endl;
return -1;
}
return 0;
}
#else // WITH_ACAC
#include <iostream>
int main(int argcc, char* argv[]) {
std::cerr << "FoundationDB is built without ACAC enabled" << std::endl;
return -1;
}
#endif // WITH_ACAC

View File

@ -18,11 +18,12 @@
* limitations under the License.
*/
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Security.Cryptography;
namespace actorcompiler
{
@ -320,6 +321,7 @@ namespace actorcompiler
int chooseGroups = 0, whenCount = 0;
string This;
bool generateProbes;
public Dictionary<(ulong, ulong), string> uidObjects { get; private set; }
public ActorCompiler(Actor actor, string sourceFile, bool isTopLevel, bool lineNumbersEnabled, bool generateProbes)
{
@ -328,9 +330,104 @@ namespace actorcompiler
this.isTopLevel = isTopLevel;
this.LineNumbersEnabled = lineNumbersEnabled;
this.generateProbes = generateProbes;
this.uidObjects = new Dictionary<(ulong, ulong), string>();
FindState();
}
private ulong ByteToLong(byte[] bytes) {
// NOTE: Always assume big endian.
ulong result = 0;
foreach(var b in bytes) {
result += b;
result <<= 8;
}
return result;
}
// Generates the identifier for the ACTOR
private Tuple<ulong, ulong> GetUidFromString(string str) {
byte[] sha256Hash = SHA256.Create().ComputeHash(Encoding.UTF8.GetBytes(str));
byte[] first = sha256Hash.Take(8).ToArray();
byte[] second = sha256Hash.Skip(8).Take(8).ToArray();
return new Tuple<ulong, ulong>(ByteToLong(first), ByteToLong(second));
}
// Writes the function that returns the Actor object
private void WriteActorFunction(TextWriter writer, string fullReturnType) {
WriteTemplate(writer);
LineNumber(writer, actor.SourceLine);
foreach (string attribute in actor.attributes) {
writer.Write(attribute + " ");
}
if (actor.isStatic) writer.Write("static ");
writer.WriteLine("{0} {3}{1}( {2} ) {{", fullReturnType, actor.name, string.Join(", ", ParameterList()), actor.nameSpace==null ? "" : actor.nameSpace + "::");
LineNumber(writer, actor.SourceLine);
string newActor = string.Format("new {0}({1})",
fullClassName,
string.Join(", ", actor.parameters.Select(p => p.name).ToArray()));
if (actor.returnType != null)
writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType);
else
writer.WriteLine("\t{0};", newActor);
writer.WriteLine("}");
}
// Writes the class of the Actor object
private void WriteActorClass(TextWriter writer, string fullStateClassName, Function body) {
// The final actor class mixes in the State class, the Actor base class and all callback classes
writer.WriteLine("// This generated class is to be used only via {0}()", actor.name);
WriteTemplate(writer);
LineNumber(writer, actor.SourceLine);
string callback_base_classes = string.Join(", ", callbacks.Select(c=>string.Format("public {0}", c.type)));
if (callback_base_classes != "") callback_base_classes += ", ";
writer.WriteLine("class {0} final : public Actor<{2}>, {3}public FastAllocated<{1}>, public {4} {{",
className,
fullClassName,
actor.returnType == null ? "void" : actor.returnType,
callback_base_classes,
fullStateClassName
);
writer.WriteLine("public:");
writer.WriteLine("\tusing FastAllocated<{0}>::operator new;", fullClassName);
writer.WriteLine("\tusing FastAllocated<{0}>::operator delete;", fullClassName);
var actorIdentifierKey = this.sourceFile + ":" + this.actor.name;
var actorIdentifier = GetUidFromString(actorIdentifierKey);
uidObjects.Add((actorIdentifier.Item1, actorIdentifier.Item2), actorIdentifierKey);
// NOTE UL is required as a u64 postfix for large integers, otherwise Clang would complain
writer.WriteLine("\tstatic constexpr ActorIdentifier __actorIdentifier = UID({0}UL, {1}UL);", actorIdentifier.Item1, actorIdentifier.Item2);
writer.WriteLine("\tActiveActorHelper activeActorHelper;");
writer.WriteLine("#pragma clang diagnostic push");
writer.WriteLine("#pragma clang diagnostic ignored \"-Wdelete-non-virtual-dtor\"");
if (actor.returnType != null)
writer.WriteLine(@" void destroy() override {{
activeActorHelper.~ActiveActorHelper();
static_cast<Actor<{0}>*>(this)->~Actor();
operator delete(this);
}}", actor.returnType);
else
writer.WriteLine(@" void destroy() {{
activeActorHelper.~ActiveActorHelper();
static_cast<Actor<void>*>(this)->~Actor();
operator delete(this);
}}");
writer.WriteLine("#pragma clang diagnostic pop");
foreach (var cb in callbacks)
writer.WriteLine("friend struct {0};", cb.type);
LineNumber(writer, actor.SourceLine);
WriteConstructor(body, writer, fullStateClassName);
WriteCancelFunc(writer);
writer.WriteLine("};");
}
public void Write(TextWriter writer)
{
string fullReturnType =
@ -349,10 +446,13 @@ namespace actorcompiler
break;
}
// e.g. SimpleTimerActor
fullClassName = className + GetTemplateActuals();
var actorClassFormal = new VarDeclaration { name = className, type = "class" };
This = string.Format("static_cast<{0}*>(this)", actorClassFormal.name);
// e.g. SimpleTimerActorState
stateClassName = className + "State";
// e.g. SimpleTimerActorState<SimpleTimerActor>
var fullStateClassName = stateClassName + GetTemplateActuals(new VarDeclaration { type = "class", name = fullClassName });
if (actor.isForwardDeclaration) {
@ -414,59 +514,11 @@ namespace actorcompiler
}
writer.WriteLine("};");
// The final actor class mixes in the State class, the Actor base class and all callback classes
writer.WriteLine("// This generated class is to be used only via {0}()", actor.name);
WriteTemplate(writer);
LineNumber(writer, actor.SourceLine);
WriteActorClass(writer, fullStateClassName, body);
string callback_base_classes = string.Join(", ", callbacks.Select(c=>string.Format("public {0}", c.type)));
if (callback_base_classes != "") callback_base_classes += ", ";
writer.WriteLine("class {0} final : public Actor<{2}>, {3}public FastAllocated<{1}>, public {4} {{",
className,
fullClassName,
actor.returnType == null ? "void" : actor.returnType,
callback_base_classes,
fullStateClassName
);
writer.WriteLine("public:");
writer.WriteLine("\tusing FastAllocated<{0}>::operator new;", fullClassName);
writer.WriteLine("\tusing FastAllocated<{0}>::operator delete;", fullClassName);
if (isTopLevel && actor.nameSpace == null) writer.WriteLine("} // namespace"); // namespace
writer.WriteLine("#pragma clang diagnostic push");
writer.WriteLine("#pragma clang diagnostic ignored \"-Wdelete-non-virtual-dtor\"");
if (actor.returnType != null)
writer.WriteLine("\tvoid destroy() override {{ ((Actor<{0}>*)this)->~Actor(); operator delete(this); }}", actor.returnType);
else
writer.WriteLine("\tvoid destroy() {{ ((Actor<void>*)this)->~Actor(); operator delete(this); }}");
writer.WriteLine("#pragma clang diagnostic pop");
foreach (var cb in callbacks)
writer.WriteLine("friend struct {0};", cb.type);
LineNumber(writer, actor.SourceLine);
WriteConstructor(body, writer, fullStateClassName);
//WriteStartFunc(body, writer);
WriteCancelFunc(writer);
writer.WriteLine("};");
if (isTopLevel && actor.nameSpace == null) writer.WriteLine("}"); // namespace
WriteTemplate(writer);
LineNumber(writer, actor.SourceLine);
foreach (string attribute in actor.attributes) {
writer.Write(attribute + " ");
}
if (actor.isStatic) writer.Write("static ");
writer.WriteLine("{0} {3}{1}( {2} ) {{", fullReturnType, actor.name, string.Join(", ", ParameterList()), actor.nameSpace==null ? "" : actor.nameSpace + "::");
LineNumber(writer, actor.SourceLine);
string newActor = string.Format("new {0}({1})",
fullClassName,
string.Join(", ", actor.parameters.Select(p => p.name).ToArray()));
if (actor.returnType != null)
writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType);
else
writer.WriteLine("\t{0};", newActor);
writer.WriteLine("}");
WriteActorFunction(writer, fullReturnType);
if (actor.testCaseParameters != null)
{
@ -482,6 +534,11 @@ namespace actorcompiler
if (generateProbes) {
fun.WriteLine("fdb_probe_actor_enter(\"{0}\", {1}, {2});", name, thisAddress, index);
}
var blockIdentifier = GetUidFromString(fun.name);
fun.WriteLine("#ifdef WITH_ACAC");
fun.WriteLine("static constexpr ActorBlockIdentifier __identifier = UID({0}UL, {1}UL);", blockIdentifier.Item1, blockIdentifier.Item2);
fun.WriteLine("ActorExecutionContextHelper __helper(static_cast<{0}*>(this)->activeActorHelper.actorID, __identifier);", className);
fun.WriteLine("#endif // WITH_ACAC");
}
void ProbeExit(Function fun, string name, int index = -1) {
@ -1291,20 +1348,29 @@ namespace actorcompiler
endIsUnreachable = true,
publicName = true
};
// Initializes class member variables
constructor.Indent(codeIndent);
constructor.WriteLine( " : Actor<" + (actor.returnType == null ? "void" : actor.returnType) + ">()," );
constructor.WriteLine( " {0}({1})", fullStateClassName, string.Join(", ", actor.parameters.Select(p => p.name)));
constructor.WriteLine( " {0}({1}),", fullStateClassName, string.Join(", ", actor.parameters.Select(p => p.name)));
constructor.WriteLine( " activeActorHelper(__actorIdentifier)");
constructor.Indent(-1);
constructor.WriteLine("{");
constructor.Indent(+1);
ProbeEnter(constructor, actor.name);
constructor.WriteLine("#ifdef ENABLE_SAMPLING");
constructor.WriteLine("this->lineage.setActorName(\"{0}\");", actor.name);
constructor.WriteLine("LineageScope _(&this->lineage);");
constructor.WriteLine("#endif");
// constructor.WriteLine("getCurrentLineage()->modify(&StackLineage::actorName) = \"{0}\"_sr;", actor.name);
constructor.WriteLine("#endif");
constructor.WriteLine("this->{0};", body.call());
ProbeExit(constructor, actor.name);
WriteFunction(writer, constructor, constructor.BodyText);
}

View File

@ -1,4 +1,4 @@
/*
/*;
* ActorParser.cs
*
* This source file is part of the FoundationDB open source project
@ -18,10 +18,9 @@
* limitations under the License.
*/
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
namespace actorcompiler
@ -242,12 +241,14 @@ namespace actorcompiler
string sourceFile;
ErrorMessagePolicy errorMessagePolicy;
public bool generateProbes;
public Dictionary<(ulong, ulong), string> uidObjects { get; private set; }
public ActorParser(string text, string sourceFile, ErrorMessagePolicy errorMessagePolicy, bool generateProbes)
{
this.sourceFile = sourceFile;
this.errorMessagePolicy = errorMessagePolicy;
this.generateProbes = generateProbes;
this.uidObjects = new Dictionary<(ulong, ulong), string>();
tokens = Tokenize(text).Select(t=>new Token{ Value=t }).ToArray();
CountParens();
//if (sourceFile.EndsWith(".h")) LineNumbersEnabled = false;
@ -340,15 +341,17 @@ namespace actorcompiler
}
if (tokens[i].Value == "ACTOR" || tokens[i].Value == "SWIFT_ACTOR" || tokens[i].Value == "TEST_CASE")
{
int end;
var actor = ParseActor(i, out end);
var actor = ParseActor(i, out int end);
if (classContextStack.Count > 0)
{
actor.enclosingClass = String.Join("::", classContextStack.Reverse().Select(t => t.name));
}
var actorWriter = new System.IO.StringWriter();
actorWriter.NewLine = "\n";
new ActorCompiler(actor, sourceFile, inBlocks == 0, LineNumbersEnabled, generateProbes).Write(actorWriter);
var actorCompiler = new ActorCompiler(actor, sourceFile, inBlocks == 0, LineNumbersEnabled, generateProbes);
actorCompiler.Write(actorWriter);
actorCompiler.uidObjects.ToList().ForEach(x => this.uidObjects.TryAdd(x.Key, x.Value));
string[] actorLines = actorWriter.ToString().Split('\n');
bool hasLineNumber = false;

View File

@ -19,15 +19,22 @@
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
namespace actorcompiler
{
class Program
{
private static void OverwriteByMove(string target, string temporaryFile) {
if (File.Exists(target))
{
File.SetAttributes(target, FileAttributes.Normal);
File.Delete(target);
}
File.Move(temporaryFile, target);
File.SetAttributes(target, FileAttributes.ReadOnly);
}
public static int Main(string[] args)
{
bool generateProbes = false;
@ -38,7 +45,7 @@ namespace actorcompiler
return 100;
}
Console.WriteLine("actorcompiler {0}", string.Join(" ", args));
string input = args[0], output = args[1], outputtmp = args[1] + ".tmp";
string input = args[0], output = args[1], outputtmp = args[1] + ".tmp", outputUid = args[1] + ".uid";
ErrorMessagePolicy errorMessagePolicy = new ErrorMessagePolicy();
foreach (var arg in args) {
if (arg.StartsWith("--")) {
@ -52,15 +59,20 @@ namespace actorcompiler
try
{
var inputData = File.ReadAllText(input);
using (var outputStream = new StreamWriter(outputtmp))
new ActorParser(inputData, input.Replace('\\', '/'), errorMessagePolicy, generateProbes).Write(outputStream, output.Replace('\\', '/'));
if (File.Exists(output))
{
File.SetAttributes(output, FileAttributes.Normal);
File.Delete(output);
var parser = new ActorParser(inputData, input.Replace('\\', '/'), errorMessagePolicy, generateProbes);
using (var outputStream = new StreamWriter(outputtmp)) {
parser.Write(outputStream, output.Replace('\\', '/'));
}
File.Move(outputtmp, output);
File.SetAttributes(output, FileAttributes.ReadOnly);
OverwriteByMove(output, outputtmp);
using (var outputStream = new StreamWriter(outputtmp)) {
foreach(var entry in parser.uidObjects) {
outputStream.WriteLine("{0}|{1}|{2}", entry.Key.Item1, entry.Key.Item2, entry.Value);
}
}
OverwriteByMove(outputUid, outputtmp);
return 0;
}
catch (actorcompiler.Error e)

View File

@ -0,0 +1,137 @@
/*
* ActorContext.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_ACTOR_CONTEXT_H
#define FLOW_ACTOR_CONTEXT_H
#ifdef WITH_ACAC
#include <cstdint>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>
#include "flow/FastAlloc.h"
#include "flow/FastRef.h"
#include "flow/IRandom.h"
using ActorIdentifier = UID;
using ActorID = uint64_t;
static constexpr ActorID INVALID_ACTOR_ID = std::numeric_limits<ActorID>::max();
static constexpr ActorID INIT_ACTOR_ID = 0;
struct ActiveActor {
ActorIdentifier identifier = ActorIdentifier();
ActorID id = ActorID();
double spawnTime = double();
ActorID spawner = INVALID_ACTOR_ID;
ActiveActor();
explicit ActiveActor(const ActorIdentifier& identifier_,
const ActorID& id_,
const ActorID& spawnerID_ = INVALID_ACTOR_ID);
template <typename Ar>
void serialize(Ar& ar) {
serializer(ar, identifier, id, spawnTime, spawner);
}
};
using ActorBlockIdentifier = UID;
struct ActorExecutionContext {
ActorID actorID;
ActorBlockIdentifier blockIdentifier;
explicit ActorExecutionContext(const ActorID actorID_, const ActorBlockIdentifier blockIdentifier_)
: actorID(actorID_), blockIdentifier(blockIdentifier_) {}
};
// Dumps the current ACTORs to a given stream
extern void dumpActors(std::ostream& stream);
// A helper class that register/unregister the Actor
class ActiveActorHelper {
public:
ActorID actorID;
ActiveActorHelper(const ActorIdentifier& actorIdentifier);
~ActiveActorHelper();
};
class ActorExecutionContextHelper {
public:
ActorExecutionContextHelper(const ActorID& actorID_, const ActorBlockIdentifier& blockIdentifier_);
~ActorExecutionContextHelper();
};
enum class ActorContextDumpType : uint8_t {
FULL_CONTEXT,
CURRENT_STACK,
CURRENT_CALL_BACKTRACE,
};
// Encode the current actor context into a string
extern std::string encodeActorContext(const ActorContextDumpType dumpType = ActorContextDumpType::FULL_CONTEXT);
struct DecodedActorContext {
struct ActorInfo {
ActorID id;
ActorIdentifier identifier;
ActorID spawnerID;
ActorInfo(const ActorID& _id, const ActorIdentifier& _identifier, const ActorID& _spawnerID)
: id(_id), identifier(_identifier), spawnerID(_spawnerID) {}
};
ActorID currentRunningActor;
std::vector<ActorInfo> context;
ActorContextDumpType dumpType;
};
// Decode the serialized actor context to DecodedActorContext
DecodedActorContext decodeActorContext(const std::string& caller);
#else // WITH_ACAC
#include <memory>
#include "flow/IRandom.h"
using ActorID = uint64_t;
using ActorIdentifier = UID;
using ActorBlockIdentifier = UID;
struct ActorExecutionContext {};
struct ActiveActor {};
struct ActiveActorHelper {
ActiveActorHelper(const ActorIdentifier&) {}
};
struct ActorExecutionContextHelper {
ActorExecutionContextHelper(const ActorID&, const ActorBlockIdentifier&);
};
#endif // WITH_ACAC
#endif // FLOW_ACTOR_CONTEXT_H

View File

@ -23,10 +23,10 @@
#pragma once
#include "flow/BooleanParam.h"
#include "flow/Error.h"
#include "flow/FastAlloc.h"
#include "flow/FastRef.h"
#include "flow/Error.h"
#include "flow/Trace.h"
#include "flow/IRandom.h"
#include "flow/ObjectSerializerTraits.h"
#include "flow/FileIdentifier.h"
#include "flow/swift_support.h"

View File

@ -26,11 +26,13 @@
#include <map>
#include <boost/preprocessor/facilities/is_empty.hpp>
#include <boost/preprocessor/control/if.hpp>
#include "flow/ActorContext.h"
#include "flow/Platform.h"
#include "flow/Knobs.h"
#include "flow/FileIdentifier.h"
#include "flow/ObjectSerializerTraits.h"
#include "flow/Traceable.h"
#include <iostream>
enum { invalid_error_code = 0xffff };
@ -57,7 +59,7 @@ public:
serializer(ar, error_code);
}
Error() : error_code(invalid_error_code), flags(0) {}
Error();
explicit Error(int error_code);
static void init();

View File

@ -22,7 +22,6 @@
#define FLOW_FASTALLOC_H
#pragma once
#include "flow/Error.h"
#include "flow/Platform.h"
#include "flow/config.h"

View File

@ -65,6 +65,8 @@ public:
virtual void addref() = 0;
virtual void delref() = 0;
virtual StringRef getClassName() { return "IAsyncFile"_sr; }
// For read() and write(), the data buffer must remain valid until the future is ready
virtual Future<int> read(void* data,
int length,

View File

@ -93,8 +93,8 @@ public:
static UID fromStringThrowsOnFailure(std::string const&);
template <class Ar>
void serialize_unversioned(
Ar& ar) { // Changing this serialization format will affect key definitions, so can't simply be versioned!
void serialize_unversioned(Ar& ar) {
// Changing this serialization format will affect key definitions, so can't simply be versioned!
serializer(ar, part[0], part[1]);
}
};

View File

@ -26,13 +26,13 @@
#include "flow/Traceable.h"
#include "flow/FileIdentifier.h"
#include "flow/Error.h"
#include "flow/swift_support.h"
#ifdef WITH_SWIFT
#include <swift/bridging>
#endif
class Arena;
class Void;
// Optional is a wrapper for std::optional. There
// are two primary reasons to use this wrapper instead

View File

@ -20,6 +20,7 @@
#ifndef FLOW_FLOW_H
#define FLOW_FLOW_H
#include "flow/ActorContext.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#pragma once
@ -31,25 +32,29 @@
#pragma warning(error : 4239)
#endif
#include <vector>
#include <queue>
#include <functional>
#include <string_view>
#include <utility>
#include <algorithm>
#include <iosfwd>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <stack>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>
#include "flow/CodeProbe.h"
#include "flow/Platform.h"
#include "flow/FastAlloc.h"
#include "flow/IRandom.h"
#include "flow/serialize.h"
#include "flow/Deque.h"
#include "flow/ThreadPrimitives.h"
#include "flow/network.h"
#include "flow/Error.h"
#include "flow/FastAlloc.h"
#include "flow/FileIdentifier.h"
#include "flow/IRandom.h"
#include "flow/Platform.h"
#include "flow/ThreadPrimitives.h"
#include "flow/WriteOnlySet.h"
#include "flow/network.h"
#include "flow/serialize.h"
#ifdef WITH_SWIFT
#include <swift/bridging>
@ -58,7 +63,7 @@
// without relying on any imported Swift types.
#ifndef SWIFT_HIDE_CHECKED_CONTINUTATION
#include "SwiftModules/Flow_CheckedContinuation.h"
#endif /* SWIFT_HIDE_CHECKED_CONTINUATION */
#endif /* SWIFT_HIDE_CHECKED_CONTINUATION */
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wnullability-completeness"
@ -953,15 +958,12 @@ public:
using AssociatedFuture = Future<T>;
private:
SwiftCC continuationInstance;
SwiftCC continuationInstance;
public:
FlowCallbackForSwiftContinuation() :
continuationInstance(SwiftCC::init()) {}
FlowCallbackForSwiftContinuation() : continuationInstance(SwiftCC::init()) {}
void set(const void * _Nonnull pointerToContinuationInstance,
Future<T> f,
const void * _Nonnull thisPointer) {
void set(const void* _Nonnull pointerToContinuationInstance, Future<T> f, const void* _Nonnull thisPointer) {
// Verify Swift did not make a copy of the `self` value for this method
// call.
assert(this == thisPointer);
@ -969,16 +971,16 @@ public:
// FIXME: Propagate `SwiftCC` to Swift using forward
// interop, without relying on passing it via a `void *`
// here. That will let us avoid this hack.
const void *_Nonnull opaqueStorage = pointerToContinuationInstance;
static_assert(sizeof(SwiftCC) == sizeof(const void *));
const SwiftCC ccCopy(*reinterpret_cast<const SwiftCC *>(&opaqueStorage));
const void* _Nonnull opaqueStorage = pointerToContinuationInstance;
static_assert(sizeof(SwiftCC) == sizeof(const void*));
const SwiftCC ccCopy(*reinterpret_cast<const SwiftCC*>(&opaqueStorage));
// Set the continuation instance.
continuationInstance.set(ccCopy);
// Add this callback to the future.
f.addCallbackAndClear(this);
}
void fire(const T &value) override {
void fire(const T& value) override {
Callback<T>::remove();
Callback<T>::next = nullptr;
continuationInstance.resume(value);
@ -996,14 +998,13 @@ public:
#endif /* WITH_SWIFT*/
template <class T>
class
SWIFT_SENDABLE
class SWIFT_SENDABLE
#ifndef SWIFT_HIDE_CHECKED_CONTINUTATION
#ifdef WITH_SWIFT
SWIFT_CONFORMS_TO_PROTOCOL(flow_swift.FlowFutureOps)
#endif
#endif
Future {
Future {
public:
using Element = T;
#ifdef WITH_SWIFT
@ -1121,18 +1122,15 @@ public:
sav->send(std::forward<U>(value));
}
// Swift can't call method that takes in a universal references (U&&),
// so provide a callable `send` method that copies the value.
void sendCopy(const T& valueCopy) const SWIFT_NAME(send(_:)) {
sav->send(valueCopy);
}
// Swift can't call method that takes in a universal references (U&&),
// so provide a callable `send` method that copies the value.
void sendCopy(const T& valueCopy) const SWIFT_NAME(send(_:)) { sav->send(valueCopy); }
template <class E>
void sendError(const E& exc) const {
sav->sendError(exc);
}
SWIFT_CXX_IMPORT_UNSAFE Future<T> getFuture() const {
sav->addFutureRef();
return Future<T>(sav);
@ -1352,15 +1350,14 @@ public:
bool operator==(const FutureStream& rhs) { return rhs.queue == queue; }
bool operator!=(const FutureStream& rhs) { return rhs.queue != queue; }
// FIXME: remove annotation after https://github.com/apple/swift/issues/64316 is fixed.
T pop() __attribute__((swift_attr("import_unsafe"))) { return queue->pop(); }
// FIXME: remove annotation after https://github.com/apple/swift/issues/64316 is fixed.
T pop() __attribute__((swift_attr("import_unsafe"))) { return queue->pop(); }
Error getError() const {
ASSERT(queue->isError());
return queue->error;
}
explicit FutureStream(NotifiedQueue<T>* queue) : queue(queue) {
}
explicit FutureStream(NotifiedQueue<T>* queue) : queue(queue) {}
private:
NotifiedQueue<T>* queue;
@ -1402,15 +1399,9 @@ public:
// stream.send( request )
// Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times)
void send(const T& value) {
queue->send(value);
}
void sendCopy(T value) {
queue->send(value);
}
void send(T&& value) {
queue->send(std::move(value));
}
void send(const T& value) { queue->send(value); }
void sendCopy(T value) { queue->send(value); }
void send(T&& value) { queue->send(std::move(value)); }
void sendError(const Error& error) { queue->sendError(error); }
// stream.getReply( request )
@ -1443,7 +1434,7 @@ public:
// Not const, because this function gives mutable
// access to queue
SWIFT_CXX_IMPORT_UNSAFE FutureStream<T> getFuture() {
SWIFT_CXX_IMPORT_UNSAFE FutureStream<T> getFuture() {
queue->addFutureRef();
return FutureStream<T>(queue);
}