From 91ec1fdf10a3ff8676bb478d4803be8e61f1a85c Mon Sep 17 00:00:00 2001 From: Xiaoge Su Date: Tue, 20 Jun 2023 16:26:48 -0700 Subject: [PATCH] Provide actor call backtrace See design/AcAC.md --- CMakeLists.txt | 11 + cmake/CompileBoost.cmake | 23 +- .../BackupContainerAzureBlobStore.actor.cpp | 4 + .../fdbclient/AsyncFileS3BlobStore.actor.h | 4 + fdbrpc/include/fdbrpc/AsyncFileCached.actor.h | 2 + fdbrpc/include/fdbrpc/AsyncFileChaos.h | 2 + fdbrpc/include/fdbrpc/AsyncFileEIO.actor.h | 2 + fdbrpc/include/fdbrpc/AsyncFileEncrypted.h | 2 + fdbrpc/include/fdbrpc/AsyncFileKAIO.actor.h | 2 + .../fdbrpc/AsyncFileNonDurable.actor.h | 4 + .../include/fdbrpc/AsyncFileReadAhead.actor.h | 2 + .../include/fdbrpc/AsyncFileWinASIO.actor.h | 2 + fdbrpc/include/fdbrpc/AsyncFileWriteChecker.h | 2 + fdbrpc/sim2.actor.cpp | 3 + flow/ActorContext.cpp | 201 ++++++++++++++++++ flow/CMakeLists.txt | 21 +- flow/Error.cpp | 4 +- flow/MkCert.cpp | 1 + flow/Net2.actor.cpp | 2 +- flow/PKey.cpp | 5 +- flow/Trace.cpp | 4 + flow/acac.cpp | 169 +++++++++++++++ flow/actorcompiler/ActorCompiler.cs | 174 ++++++++++----- flow/actorcompiler/ActorParser.cs | 15 +- flow/actorcompiler/Program.cs | 36 ++-- flow/include/flow/ActorContext.h | 137 ++++++++++++ flow/include/flow/Arena.h | 4 +- flow/include/flow/Error.h | 4 +- flow/include/flow/FastAlloc.h | 1 - flow/include/flow/IAsyncFile.h | 2 + flow/include/flow/IRandom.h | 4 +- flow/include/flow/Optional.h | 2 +- flow/include/flow/flow.h | 81 ++++--- 33 files changed, 794 insertions(+), 138 deletions(-) create mode 100644 flow/ActorContext.cpp create mode 100644 flow/acac.cpp create mode 100644 flow/include/flow/ActorContext.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6db6199c53..cc8f8234da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,15 @@ endif() set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin) set(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/lib) +option(WITH_ACAC "Enable actor stack recording" OFF) +if (WITH_ACAC) + message(STATUS "Build FoundationDB with AcAC support") + if (FDB_RELEASE OR FDB_RELEASE_CANDITATE) + message(FATAL_ERROR "ACAC will cause severe slowdown of the system and SHOULD not be enabled in Release.") + endif() + add_compile_definitions(WITH_ACAC) +endif() + ################################################################################ # Packages used for bindings ################################################################################ @@ -242,3 +251,5 @@ endif() print_components() message(STATUS "CPACK_COMPONENTS_ALL ${CPACK_COMPONENTS_ALL}") + + diff --git a/cmake/CompileBoost.cmake b/cmake/CompileBoost.cmake index b21aa3a8ec..35207b6b67 100644 --- a/cmake/CompileBoost.cmake +++ b/cmake/CompileBoost.cmake @@ -9,7 +9,7 @@ function(compile_boost) # Configure bootstrap command set(BOOTSTRAP_COMMAND "./bootstrap.sh") - set(BOOTSTRAP_LIBRARIES "context,filesystem,iostreams,system,serialization") + set(BOOTSTRAP_LIBRARIES "context,filesystem,iostreams,system,serialization,program_options") set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}") # Can't build Boost with Intel compiler, use clang instead. @@ -88,12 +88,17 @@ function(compile_boost) "${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a" "${BOOST_INSTALL_DIR}/lib/libboost_iostreams.a" "${BOOST_INSTALL_DIR}/lib/libboost_serialization.a" - "${BOOST_INSTALL_DIR}/lib/libboost_system.a") + "${BOOST_INSTALL_DIR}/lib/libboost_system.a" + "${BOOST_INSTALL_DIR}/lib/libboost_program_options.a") add_library(${COMPILE_BOOST_TARGET}_context STATIC IMPORTED) add_dependencies(${COMPILE_BOOST_TARGET}_context ${COMPILE_BOOST_TARGET}Project) set_target_properties(${COMPILE_BOOST_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a") + add_library(${COMPILE_BOOST_TARGET}_program_options STATIC IMPORTED) + add_dependencies(${COMPILE_BOOST_TARGET}_program_options ${COMPILE_BOOST_TARGET}Project) + set_target_properties(${COMPILE_BOOST_TARGET}_program_options PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_program_options.a") + add_library(${COMPILE_BOOST_TARGET}_filesystem STATIC IMPORTED) add_dependencies(${COMPILE_BOOST_TARGET}_filesystem ${COMPILE_BOOST_TARGET}Project) set_target_properties(${COMPILE_BOOST_TARGET}_filesystem PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a") @@ -138,11 +143,11 @@ set(Boost_USE_STATIC_LIBS ON) if (UNIX AND CMAKE_CXX_COMPILER_ID MATCHES "Clang$" AND USE_LIBCXX) list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_78_0_clang) set(BOOST_HINT_PATHS /opt/boost_1_78_0_clang) - message(STATUS "Using Clang version of boost::context boost::filesystem and boost::iostreams") + message(STATUS "Using Clang version of boost") else () list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_78_0) set(BOOST_HINT_PATHS /opt/boost_1_78_0) - message(STATUS "Using g++ version of boost::context boost::filesystem and boost::iostreams") + message(STATUS "Using g++ version of boost") endif () if(BOOST_ROOT) @@ -154,13 +159,16 @@ if(WIN32) # properly for config mode. So we use the old way on Windows # find_package(Boost 1.72.0 EXACT QUIET REQUIRED CONFIG PATHS ${BOOST_HINT_PATHS}) # I think depending on the cmake version this will cause weird warnings - find_package(Boost 1.78 COMPONENTS filesystem iostreams serialization system) + find_package(Boost 1.78 COMPONENTS filesystem iostreams serialization system program_options) add_library(boost_target INTERFACE) target_link_libraries(boost_target INTERFACE Boost::boost Boost::filesystem Boost::iostreams Boost::serialization Boost::system) + + add_library(boost_target_program_options INTERFACE) + target_link_libraries(boost_target_program_options INTERFACE Boost::boost Boost::program_options) return() endif() -find_package(Boost 1.78.0 EXACT QUIET COMPONENTS context filesystem iostreams serialization system CONFIG PATHS ${BOOST_HINT_PATHS}) +find_package(Boost 1.78.0 EXACT QUIET COMPONENTS context filesystem iostreams program_options serialization system CONFIG PATHS ${BOOST_HINT_PATHS}) set(FORCE_BOOST_BUILD OFF CACHE BOOL "Forces cmake to build boost and ignores any installed boost") # The precompiled boost silently broke in CI. While investigating, I considered extending @@ -179,6 +187,9 @@ set(FORCE_BOOST_BUILD OFF CACHE BOOL "Forces cmake to build boost and ignores an if(Boost_FOUND AND NOT FORCE_BOOST_BUILD) add_library(boost_target INTERFACE) target_link_libraries(boost_target INTERFACE Boost::boost Boost::context Boost::filesystem Boost::iostreams Boost::serialization Boost::system) + + add_library(boost_target_program_options INTERFACE) + target_link_libraries(boost_target_program_options INTERFACE Boost::boost Boost::program_options) elseif(WIN32) message(FATAL_ERROR "Could not find Boost") else() diff --git a/fdbclient/azure_backup/BackupContainerAzureBlobStore.actor.cpp b/fdbclient/azure_backup/BackupContainerAzureBlobStore.actor.cpp index 2579bdaea3..02d9a43174 100644 --- a/fdbclient/azure_backup/BackupContainerAzureBlobStore.actor.cpp +++ b/fdbclient/azure_backup/BackupContainerAzureBlobStore.actor.cpp @@ -60,6 +60,8 @@ public: std::shared_ptr client; public: + virtual StringRef getClassName() override { return "BackupContainerAzureBlobStoreImpl::ReadFile"_sr; } + ReadFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName, @@ -121,6 +123,8 @@ public: static constexpr size_t bufferLimit = 1 << 20; public: + virtual StringRef getClassName() override { return "BackupContainerAzureBlobStoreImpl::WriteFile"_sr; } + WriteFile(AsyncTaskThread& asyncTaskThread, const std::string& containerName, const std::string& blobName, diff --git a/fdbclient/include/fdbclient/AsyncFileS3BlobStore.actor.h b/fdbclient/include/fdbclient/AsyncFileS3BlobStore.actor.h index 3a17528358..67111ef620 100644 --- a/fdbclient/include/fdbclient/AsyncFileS3BlobStore.actor.h +++ b/fdbclient/include/fdbclient/AsyncFileS3BlobStore.actor.h @@ -60,6 +60,8 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + virtual StringRef getClassName() override { return "AsyncFileS3BlobStoreWrite"_sr; } + struct Part : ReferenceCounted { Part(int n, int minSize) : number(n), writer(content.getWriteBuffer(minSize), nullptr, Unversioned()), length(0) { @@ -266,6 +268,8 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + virtual StringRef getClassName() override { return "AsyncFileS3BlobStoreRead"_sr; } + Future read(void* data, int length, int64_t offset) override; Future write(void const* data, int length, int64_t offset) override { throw file_not_writable(); } diff --git a/fdbrpc/include/fdbrpc/AsyncFileCached.actor.h b/fdbrpc/include/fdbrpc/AsyncFileCached.actor.h index 72866f0a1d..8aebb92225 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileCached.actor.h +++ b/fdbrpc/include/fdbrpc/AsyncFileCached.actor.h @@ -140,6 +140,8 @@ class AsyncFileCached final : public IAsyncFile, public ReferenceCounted> open(std::string filename, int flags, int mode) { //TraceEvent("AsyncFileCachedOpen").detail("Filename", filename); diff --git a/fdbrpc/include/fdbrpc/AsyncFileChaos.h b/fdbrpc/include/fdbrpc/AsyncFileChaos.h index 825f9e07f1..a63a5bfac6 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileChaos.h +++ b/fdbrpc/include/fdbrpc/AsyncFileChaos.h @@ -44,6 +44,8 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + virtual StringRef getClassName() override { return "AsyncFileReadAheadCache"_sr; } + double getDelay() const { double delayFor = 0.0; if (!enabled) diff --git a/fdbrpc/include/fdbrpc/AsyncFileEIO.actor.h b/fdbrpc/include/fdbrpc/AsyncFileEIO.actor.h index a05e50a3c5..da180acad0 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileEIO.actor.h +++ b/fdbrpc/include/fdbrpc/AsyncFileEIO.actor.h @@ -45,6 +45,8 @@ class AsyncFileEIO : public IAsyncFile, public ReferenceCounted { public: + virtual StringRef getClassName() override { return "AsyncFileDetachable"_sr; } + static void init() { eio_set_max_parallel(FLOW_KNOBS->EIO_MAX_PARALLELISM); if (eio_init(&eio_want_poll, nullptr)) { diff --git a/fdbrpc/include/fdbrpc/AsyncFileEncrypted.h b/fdbrpc/include/fdbrpc/AsyncFileEncrypted.h index 78c348d568..eb535ce8b3 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileEncrypted.h +++ b/fdbrpc/include/fdbrpc/AsyncFileEncrypted.h @@ -35,6 +35,8 @@ class AsyncFileEncrypted : public IAsyncFile, public ReferenceCounted file; StreamCipher::IV firstBlockIV; diff --git a/fdbrpc/include/fdbrpc/AsyncFileKAIO.actor.h b/fdbrpc/include/fdbrpc/AsyncFileKAIO.actor.h index 4925990342..29ca8b7e3e 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileKAIO.actor.h +++ b/fdbrpc/include/fdbrpc/AsyncFileKAIO.actor.h @@ -58,6 +58,8 @@ DESCR struct SlowAioSubmit { class AsyncFileKAIO final : public IAsyncFile, public ReferenceCounted { public: + virtual StringRef getClassName() override { return "AsyncFileKAIO"_sr; } + struct AsyncFileKAIOMetrics { LatencySample readLatencySample = { "AsyncFileKAIOReadLatency", UID(), diff --git a/fdbrpc/include/fdbrpc/AsyncFileNonDurable.actor.h b/fdbrpc/include/fdbrpc/AsyncFileNonDurable.actor.h index 32eecb83f0..3e2c98ba16 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileNonDurable.actor.h +++ b/fdbrpc/include/fdbrpc/AsyncFileNonDurable.actor.h @@ -71,6 +71,8 @@ private: bool assertOnReadWriteCancel; public: + virtual StringRef getClassName() override { return "AsyncFileDetachable"_sr; } + explicit AsyncFileDetachable(Reference file) : file(file), assertOnReadWriteCancel(true) { shutdown = doShutdown(this); } @@ -103,6 +105,8 @@ public: // killed This is used to simulate a power failure which prevents all written data from being persisted to disk class AsyncFileNonDurable final : public IAsyncFile, public ReferenceCounted { public: + virtual StringRef getClassName() override { return "AsyncFileNonDurable"_sr; } + UID id; std::string filename; diff --git a/fdbrpc/include/fdbrpc/AsyncFileReadAhead.actor.h b/fdbrpc/include/fdbrpc/AsyncFileReadAhead.actor.h index 28819aba25..ca7ff166e8 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileReadAhead.actor.h +++ b/fdbrpc/include/fdbrpc/AsyncFileReadAhead.actor.h @@ -39,6 +39,8 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + virtual StringRef getClassName() override { return "AsyncFileReadAheadCache"_sr; } + struct CacheBlock : ReferenceCounted { CacheBlock(int size = 0) : data(new uint8_t[size]), len(size) {} ~CacheBlock() { delete[] data; } diff --git a/fdbrpc/include/fdbrpc/AsyncFileWinASIO.actor.h b/fdbrpc/include/fdbrpc/AsyncFileWinASIO.actor.h index 168c257e00..577d82d73a 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileWinASIO.actor.h +++ b/fdbrpc/include/fdbrpc/AsyncFileWinASIO.actor.h @@ -43,6 +43,8 @@ public: static void stop() {} + virtual StringRef getClassName() override { return "AsnycFileWinASIO"_sr; } + static bool should_poll() { return false; } // FIXME: This implementation isn't actually asynchronous - it just does operations synchronously! diff --git a/fdbrpc/include/fdbrpc/AsyncFileWriteChecker.h b/fdbrpc/include/fdbrpc/AsyncFileWriteChecker.h index b7251590f5..8fd59bcb96 100644 --- a/fdbrpc/include/fdbrpc/AsyncFileWriteChecker.h +++ b/fdbrpc/include/fdbrpc/AsyncFileWriteChecker.h @@ -30,6 +30,8 @@ public: void addref() override { ReferenceCounted::addref(); } void delref() override { ReferenceCounted::delref(); } + virtual StringRef getClassName() override { return "AsyncFileWriteChecker"_sr; } + // For read() and write(), the data buffer must remain valid until the future is ready Future read(void* data, int length, int64_t offset) override { // Lambda must hold a reference to this to keep it alive until after the read diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index a8dcfe9d9a..5006200b30 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -656,6 +656,8 @@ class SimpleFile : public IAsyncFile, public ReferenceCounted { public: static void init() {} + virtual StringRef getClassName() override { return "SimpleFile"_sr; } + static bool should_poll() { return false; } ACTOR static Future> open( @@ -2905,6 +2907,7 @@ Future> Sim2::createUDPSocket(bool isV6) { void startNewSimulator(bool printSimTime) { ASSERT(!g_network); + ASSERT(!g_simulator); g_network = g_simulator = new Sim2(printSimTime); g_simulator->connectionFailuresDisableDuration = deterministicRandom()->coinflip() ? 0 : DISABLE_CONNECTION_FAILURE_FOREVER; diff --git a/flow/ActorContext.cpp b/flow/ActorContext.cpp new file mode 100644 index 0000000000..30ee2a38be --- /dev/null +++ b/flow/ActorContext.cpp @@ -0,0 +1,201 @@ +#include "flow/ActorContext.h" + +#ifdef WITH_ACAC + +#include +#include + +#include "flow/flow.h" +#include "libb64/encode.h" +#include "libb64/decode.h" + +namespace { +std::vector g_currentExecutionContext; +std::unordered_map g_activeActors; + +ActorID getActorID() { + static thread_local ActorID actorID = INIT_ACTOR_ID; + return ++actorID; +} + +inline ActorID getActorSpawnerID() { + if (g_currentExecutionContext.empty()) { + return INIT_ACTOR_ID; + } + return g_currentExecutionContext.back().actorID; +} + +inline bool isActorOnMainThread() { + // The INetwork framework behaves differently in Net2 and Sim2. + // For Net2, when Net2::run() is called, the N2::thread_network is set to be the current Net2 instance. + // For Sim2, it tests if on main thread by calling the underlying Net2 instance, however, since Net2::run() + // is never called, the N2::thread_network will always be nullptr. In this case, Sim2::isOnMainThread will always + // return false and not reliable. + if (g_network) [[likely]] { + return g_network->isSimulated() ? true : g_network->isOnMainThread(); + } else { + return false; + } +} + +} // anonymous namespace + +using ActiveActorsCount_t = uint32_t; + +ActiveActor::ActiveActor() : identifier(), id(), spawnTime(0.0), spawner(INVALID_ACTOR_ID) {} + +ActiveActor::ActiveActor(const ActorIdentifier& identifier_, const ActorID& id_, const ActorID& spawnerID_) + : identifier(identifier_), id(id_), spawnTime(g_network != nullptr ? g_network->now() : 0.0), spawner(spawnerID_) {} + +ActiveActorHelper::ActiveActorHelper(const ActorIdentifier& actorIdentifier) { + if (!isActorOnMainThread()) [[unlikely]] { + return; + } + const auto actorID_ = getActorID(); + const auto spawnerActorID = getActorSpawnerID(); + actorID = actorID_; + g_activeActors[actorID] = ActiveActor(actorIdentifier, actorID, spawnerActorID); +} + +ActiveActorHelper::~ActiveActorHelper() { + if (!isActorOnMainThread()) [[unlikely]] { + return; + } + g_activeActors.erase(actorID); +} + +ActorExecutionContextHelper::ActorExecutionContextHelper(const ActorID& actorID_, + const ActorBlockIdentifier& blockIdentifier_) { + if (!isActorOnMainThread()) [[unlikely]] { + return; + } + g_currentExecutionContext.emplace_back(actorID_, blockIdentifier_); +} + +ActorExecutionContextHelper::~ActorExecutionContextHelper() { + if (!isActorOnMainThread()) [[unlikely]] { + return; + } + if (g_currentExecutionContext.empty()) [[unlikely]] { + // This should not happen, abort the program if it happens. + std::abort(); + } + g_currentExecutionContext.pop_back(); +} + +// TODO: Rewrite this function for better display +void dumpActors(std::ostream& stream) { + stream << "Current active ACTORs:" << std::endl; + for (const auto& [actorID, activeActor] : g_activeActors) { + stream << std::setw(10) << actorID << " " << activeActor.identifier.toString() << std::endl; + if (activeActor.spawner != INVALID_ACTOR_ID) { + stream << " Spawn by " << std::setw(10) << activeActor.spawner << std::endl; + } + } +} + +namespace { + +std::vector getCallBacktraceOfActor(const ActorID& actorID) { + std::vector actorBacktrace; + auto currentActorID = actorID; + for (;;) { + if (currentActorID == INIT_ACTOR_ID) { + // Reaching the root + break; + } + if (g_activeActors.count(currentActorID) == 0) { + // TODO: Understand why this happens and react properly + break; + } + actorBacktrace.push_back(g_activeActors.at(currentActorID)); + if (g_activeActors.at(currentActorID).spawner != INVALID_ACTOR_ID) { + currentActorID = g_activeActors.at(currentActorID).spawner; + } else { + // TODO: Understand why the actor has no spawner ID + break; + } + } + return actorBacktrace; +} + +} // anonymous namespace + +std::string encodeActorContext(const ActorContextDumpType dumpType) { + BinaryWriter writer(Unversioned()); + auto writeActorInfo = [&writer](const ActiveActor& actor) { + writer << actor.id << actor.identifier << actor.spawner; + }; + + writer << static_cast(dumpType) + << (g_currentExecutionContext.empty() ? INVALID_ACTOR_ID : g_currentExecutionContext.back().actorID); + + switch (dumpType) { + case ActorContextDumpType::FULL_CONTEXT: + writer << static_cast(g_activeActors.size()); + for (const auto& [actorID, activeActor] : g_activeActors) { + writeActorInfo(activeActor); + } + break; + case ActorContextDumpType::CURRENT_STACK: + // Only current call stack + { + if (g_currentExecutionContext.empty()) { + writer << static_cast(0); + break; + } + writer << static_cast(g_currentExecutionContext.size()); + for (const auto& context : g_currentExecutionContext) { + writeActorInfo(g_activeActors.at(context.actorID)); + } + } + break; + case ActorContextDumpType::CURRENT_CALL_BACKTRACE: + // The call backtrace of current active actor + { + if (g_currentExecutionContext.empty()) { + writer << static_cast(0); + break; + } + const auto actors = getCallBacktraceOfActor(g_currentExecutionContext.back().actorID); + writer << static_cast(actors.size()); + for (const auto& item : actors) { + writeActorInfo(item); + } + } + break; + default: + UNREACHABLE(); + } + + const std::string data = writer.toValue().toString(); + return base64::encoder::from_string(data); +} + +DecodedActorContext decodeActorContext(const std::string& caller) { + DecodedActorContext result; + const auto decoded = base64::decoder::from_string(caller); + BinaryReader reader(decoded, Unversioned()); + + std::underlying_type_t dumpTypeRaw; + reader >> dumpTypeRaw; + result.dumpType = static_cast(dumpTypeRaw); + + reader >> result.currentRunningActor; + + ActiveActorsCount_t actorCount; + reader >> actorCount; + + std::unordered_map> actors; + for (ActiveActorsCount_t i = 0; i < actorCount; ++i) { + ActorID id; + ActorID spawner; + ActorIdentifier identifier; + reader >> id >> identifier >> spawner; + result.context.emplace_back(id, identifier, spawner); + } + + return result; +} + +#endif // WITH_ACAC diff --git a/flow/CMakeLists.txt b/flow/CMakeLists.txt index bc87bd97ac..a15acc8537 100644 --- a/flow/CMakeLists.txt +++ b/flow/CMakeLists.txt @@ -12,8 +12,10 @@ if (FLOW_USE_ZSTD) endif() # Remove files with `main` defined so we can create a link test executable. +list(REMOVE_ITEM FLOW_SRCS LinkTest.cpp) list(REMOVE_ITEM FLOW_SRCS TLSTest.cpp) list(REMOVE_ITEM FLOW_SRCS MkCertCli.cpp) +list(REMOVE_ITEM FLOW_SRCS acac.cpp) if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64") list(APPEND FLOW_SRCS aarch64/memcmp.S aarch64/memcpy.S) @@ -71,7 +73,14 @@ if(APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") endif() foreach(ft flow flow_sampling flowlinktest) - target_include_directories(${ft} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include") + target_include_directories( + ${ft} + PUBLIC + "${CMAKE_CURRENT_SOURCE_DIR}/include" + "${CMAKE_CURRENT_BINARY_DIR}/include" + "${CMAKE_SOURCE_DIR}/contrib/libb64/include" + ) + if (FLOW_USE_ZSTD) target_include_directories(${ft} PRIVATE SYSTEM ${ZSTD_LIB_INCLUDE_DIR}) endif() @@ -80,7 +89,7 @@ foreach(ft flow flow_sampling flowlinktest) endif() target_link_libraries(${ft} PRIVATE stacktrace) - target_link_libraries(${ft} PUBLIC fmt::fmt SimpleOpt crc32) + target_link_libraries(${ft} PUBLIC fmt::fmt SimpleOpt crc32 libb64) if(UNIX AND NOT APPLE) target_link_libraries(${ft} PRIVATE folly_memcpy) target_compile_definitions(${ft} PRIVATE WITH_FOLLY_MEMCPY) @@ -105,12 +114,10 @@ foreach(ft flow flow_sampling flowlinktest) if(USE_VALGRIND) target_link_libraries(${ft} PUBLIC Valgrind) endif() + target_link_libraries(${ft} PUBLIC OpenSSL::SSL) target_link_libraries(${ft} PUBLIC Threads::Threads ${CMAKE_DL_LIBS}) target_link_libraries(${ft} PUBLIC boost_target) - if(USE_VALGRIND) - target_link_libraries(${ft} PUBLIC Valgrind) - endif() if(APPLE) find_library(IO_KIT IOKit) @@ -119,6 +126,9 @@ foreach(ft flow flow_sampling flowlinktest) endif() endforeach() +add_executable(acac acac.cpp) +target_link_libraries(acac PUBLIC flow boost_target_program_options) + target_compile_definitions(flow_sampling PRIVATE -DENABLE_SAMPLING) if(WIN32) add_dependencies(flow_sampling_actors flow_actors) @@ -232,3 +242,4 @@ if (WITH_SWIFT) add_dependencies(flow flow_swift) add_dependencies(flow_sampling flow_swift) endif() # WITH SWIFT + diff --git a/flow/Error.cpp b/flow/Error.cpp index 375381205a..bf1f30e897 100644 --- a/flow/Error.cpp +++ b/flow/Error.cpp @@ -101,10 +101,12 @@ Error internal_error_impl(const char* a_nm, return Error(error_code_internal_error); } +Error::Error() : error_code(invalid_error_code), flags(0) {} + Error::Error(int error_code) : error_code(error_code), flags(0) { if (TRACE_SAMPLE()) TraceEvent(SevSample, "ErrorCreated").detail("ErrorCode", error_code); - // std::cout << "Error: " << error_code << std::endl; + if (error_code >= 3000 && error_code < 6000) { { TraceEvent te(SevError, "SystemError"); diff --git a/flow/MkCert.cpp b/flow/MkCert.cpp index b62f156421..6ddb86dbba 100644 --- a/flow/MkCert.cpp +++ b/flow/MkCert.cpp @@ -24,6 +24,7 @@ #include "flow/MkCert.h" #include "flow/PKey.h" #include "flow/ScopeExit.h" +#include "flow/Trace.h" #include #include diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index cb6518fcc4..76d797d75f 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -123,7 +123,7 @@ class Connection; // Outlives main Net2* g_net2 = nullptr; -thread_local INetwork* thread_network = 0; +thread_local INetwork* thread_network = nullptr; class Net2 final : public INetwork, public INetworkConnections { diff --git a/flow/PKey.cpp b/flow/PKey.cpp index 1424fc238b..c211b3a34c 100644 --- a/flow/PKey.cpp +++ b/flow/PKey.cpp @@ -18,9 +18,12 @@ * limitations under the License. */ +#include "flow/PKey.h" + #include "flow/AutoCPointer.h" #include "flow/Error.h" -#include "flow/PKey.h" +#include "flow/Trace.h" + #include #include #include diff --git a/flow/Trace.cpp b/flow/Trace.cpp index e121ea2134..ece3e24c29 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "flow/ActorContext.h" #include "flow/Trace.h" #include "flow/FileTraceLogWriter.h" #include "flow/Knobs.h" @@ -1337,6 +1338,9 @@ void BaseTraceEvent::log() { if (this->severity == SevError) { severity = SevInfo; backtrace(); +#ifdef WITH_ACAC + detail("ActorStack", encodeActorContext(ActorContextDumpType::CURRENT_CALL_BACKTRACE)); +#endif // WITH_ACAC severity = SevError; if (errorKindIndex != -1) { fields.mutate(errorKindIndex).second = toString(errorKind); diff --git a/flow/acac.cpp b/flow/acac.cpp new file mode 100644 index 0000000000..8d0dca0b2e --- /dev/null +++ b/flow/acac.cpp @@ -0,0 +1,169 @@ +#ifdef WITH_ACAC + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "flow/ActorContext.h" + +std::unordered_map loadUIDActorMapping(const std::string& build_directory_path) { + std::unordered_map identifierToActor; + + for (const auto& dirEntry : std::filesystem::recursive_directory_iterator(build_directory_path)) { + if (!dirEntry.is_regular_file()) { + continue; + } + if (dirEntry.path().extension() != ".uid") { + continue; + } + std::ifstream ifs(dirEntry.path()); + for (;;) { + std::string line; + if (!std::getline(ifs, line)) { + break; + } + + std::stringstream ss; + ss << line; + + uint64_t part1, part2; + char pipeChar; + std::string actorName; + ss >> part1 >> pipeChar >> part2 >> pipeChar >> actorName; + + identifierToActor[UID(part1, part2)] = actorName; + } + } + + return identifierToActor; +} + +void dumpActorContextTree(std::ostream& stream, + const DecodedActorContext& decoded, + const std::unordered_map& identifierToActor) { + + std::unordered_map> spawnInfo; + for (const auto& [actorID, _1, parentID] : decoded.context) { + spawnInfo[parentID].push_back(actorID); + } + + std::unordered_map actorNames; + for (const auto& [actorID, actorIdentifier, _1] : decoded.context) { + actorNames[actorID] = identifierToActor.at(actorIdentifier); + } + + // 2-space indentation + constexpr int INDENT = 2; + std::deque> actorQueue; + + actorQueue.push_back({ INIT_ACTOR_ID, 0 }); + + while (!actorQueue.empty()) { + const auto [actorID, depth] = actorQueue.front(); + actorQueue.pop_front(); + if (spawnInfo.count(actorID)) { + for (const auto childID : spawnInfo.at(actorID)) { + actorQueue.push_front({ childID, depth + 1 }); + } + } + if (actorID == 0) { + continue; + } + + stream << std::string(INDENT * depth, ' ') << '(' << std::setw(12) << actorID << ") " << actorNames.at(actorID) + << (actorID == decoded.currentRunningActor ? " " : "") << std::endl; + } +} + +void dumpActorContextStack(std::ostream& stream, + const DecodedActorContext& decoded, + const std::unordered_map& identifierToActor) { + for (const auto& [actorID, actorIdentifier, spawnerActorID] : decoded.context) { + const std::string& actorName = identifierToActor.at(actorIdentifier); + stream << std::setw(12) << actorID << " " << actorName + << (actorID == decoded.currentRunningActor ? " " : "") << std::endl; + } +} + +void decodeClass(std::ostream& stream, + const std::string& classIdentifier, + const std::unordered_map& identifierToActor) { + UID uid = UID::fromString(classIdentifier); + stream << classIdentifier << " -- " << identifierToActor.at(uid) << std::endl; +} + +namespace bpo = boost::program_options; + +int main(int argc, char* argv[]) { + bpo::options_description desc("Options"); + desc.add_options()("help", + "Print help message")("fdb-build-directory", bpo::value(), "Build directory")( + "decode-class", bpo::value(), "Decode a class key"); + + bpo::variables_map varMap; + bpo::store(bpo::parse_command_line(argc, argv, desc), varMap); + + bpo::notify(varMap); + + if (varMap.count("help")) { + std::cerr << desc << std::endl; + return 1; + } + + std::string buildDirectory = "."; + if (varMap.count("fdb-build-directory") != 0) { + buildDirectory = varMap["fdb-build-directory"].as(); + } + + const auto lib = loadUIDActorMapping(buildDirectory); + if (varMap.count("decode-class") != 0) { + decodeClass(std::cout, varMap["decode-class"].as(), lib); + return 0; + } + + std::string encodedContext; + while (std::cin) { + std::string line; + std::getline(std::cin, line); + boost::trim(line); + // libb64 will generate newline every 72 characters, which will be encoded as "\0xa" + // in the TraceEvent log file. + boost::replace_all(line, "\\x0a", "\n"); + encodedContext += line; + } + + const auto decodedActorContext = decodeActorContext(encodedContext); + switch (decodedActorContext.dumpType) { + case ActorContextDumpType::FULL_CONTEXT: + dumpActorContextTree(std::cout, decodedActorContext, lib); + break; + case ActorContextDumpType::CURRENT_STACK: + case ActorContextDumpType::CURRENT_CALL_BACKTRACE: + dumpActorContextStack(std::cout, decodedActorContext, lib); + break; + default: + std::cerr << "Unexpected ActorContextDumpType: " << static_cast(decodedActorContext.dumpType) + << std::endl; + return -1; + } + return 0; +} + +#else // WITH_ACAC + +#include + +int main(int argcc, char* argv[]) { + std::cerr << "FoundationDB is built without ACAC enabled" << std::endl; + return -1; +} + +#endif // WITH_ACAC diff --git a/flow/actorcompiler/ActorCompiler.cs b/flow/actorcompiler/ActorCompiler.cs index c23e7c4753..68b086becd 100644 --- a/flow/actorcompiler/ActorCompiler.cs +++ b/flow/actorcompiler/ActorCompiler.cs @@ -18,11 +18,12 @@ * limitations under the License. */ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; +using System.Security.Cryptography; namespace actorcompiler { @@ -320,6 +321,7 @@ namespace actorcompiler int chooseGroups = 0, whenCount = 0; string This; bool generateProbes; + public Dictionary<(ulong, ulong), string> uidObjects { get; private set; } public ActorCompiler(Actor actor, string sourceFile, bool isTopLevel, bool lineNumbersEnabled, bool generateProbes) { @@ -328,9 +330,104 @@ namespace actorcompiler this.isTopLevel = isTopLevel; this.LineNumbersEnabled = lineNumbersEnabled; this.generateProbes = generateProbes; + this.uidObjects = new Dictionary<(ulong, ulong), string>(); FindState(); } + + private ulong ByteToLong(byte[] bytes) { + // NOTE: Always assume big endian. + ulong result = 0; + foreach(var b in bytes) { + result += b; + result <<= 8; + } + return result; + } + + // Generates the identifier for the ACTOR + private Tuple GetUidFromString(string str) { + byte[] sha256Hash = SHA256.Create().ComputeHash(Encoding.UTF8.GetBytes(str)); + byte[] first = sha256Hash.Take(8).ToArray(); + byte[] second = sha256Hash.Skip(8).Take(8).ToArray(); + return new Tuple(ByteToLong(first), ByteToLong(second)); + } + + // Writes the function that returns the Actor object + private void WriteActorFunction(TextWriter writer, string fullReturnType) { + WriteTemplate(writer); + LineNumber(writer, actor.SourceLine); + foreach (string attribute in actor.attributes) { + writer.Write(attribute + " "); + } + if (actor.isStatic) writer.Write("static "); + writer.WriteLine("{0} {3}{1}( {2} ) {{", fullReturnType, actor.name, string.Join(", ", ParameterList()), actor.nameSpace==null ? "" : actor.nameSpace + "::"); + LineNumber(writer, actor.SourceLine); + + string newActor = string.Format("new {0}({1})", + fullClassName, + string.Join(", ", actor.parameters.Select(p => p.name).ToArray())); + + if (actor.returnType != null) + writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType); + else + writer.WriteLine("\t{0};", newActor); + writer.WriteLine("}"); + } + + // Writes the class of the Actor object + private void WriteActorClass(TextWriter writer, string fullStateClassName, Function body) { + // The final actor class mixes in the State class, the Actor base class and all callback classes + writer.WriteLine("// This generated class is to be used only via {0}()", actor.name); + WriteTemplate(writer); + LineNumber(writer, actor.SourceLine); + + string callback_base_classes = string.Join(", ", callbacks.Select(c=>string.Format("public {0}", c.type))); + if (callback_base_classes != "") callback_base_classes += ", "; + writer.WriteLine("class {0} final : public Actor<{2}>, {3}public FastAllocated<{1}>, public {4} {{", + className, + fullClassName, + actor.returnType == null ? "void" : actor.returnType, + callback_base_classes, + fullStateClassName + ); + writer.WriteLine("public:"); + writer.WriteLine("\tusing FastAllocated<{0}>::operator new;", fullClassName); + writer.WriteLine("\tusing FastAllocated<{0}>::operator delete;", fullClassName); + + var actorIdentifierKey = this.sourceFile + ":" + this.actor.name; + var actorIdentifier = GetUidFromString(actorIdentifierKey); + uidObjects.Add((actorIdentifier.Item1, actorIdentifier.Item2), actorIdentifierKey); + // NOTE UL is required as a u64 postfix for large integers, otherwise Clang would complain + writer.WriteLine("\tstatic constexpr ActorIdentifier __actorIdentifier = UID({0}UL, {1}UL);", actorIdentifier.Item1, actorIdentifier.Item2); + writer.WriteLine("\tActiveActorHelper activeActorHelper;"); + + writer.WriteLine("#pragma clang diagnostic push"); + writer.WriteLine("#pragma clang diagnostic ignored \"-Wdelete-non-virtual-dtor\""); + if (actor.returnType != null) + writer.WriteLine(@" void destroy() override {{ + activeActorHelper.~ActiveActorHelper(); + static_cast*>(this)->~Actor(); + operator delete(this); + }}", actor.returnType); + else + writer.WriteLine(@" void destroy() {{ + activeActorHelper.~ActiveActorHelper(); + static_cast*>(this)->~Actor(); + operator delete(this); + }}"); + writer.WriteLine("#pragma clang diagnostic pop"); + + foreach (var cb in callbacks) + writer.WriteLine("friend struct {0};", cb.type); + + LineNumber(writer, actor.SourceLine); + WriteConstructor(body, writer, fullStateClassName); + WriteCancelFunc(writer); + writer.WriteLine("};"); + + } + public void Write(TextWriter writer) { string fullReturnType = @@ -349,10 +446,13 @@ namespace actorcompiler break; } + // e.g. SimpleTimerActor fullClassName = className + GetTemplateActuals(); var actorClassFormal = new VarDeclaration { name = className, type = "class" }; This = string.Format("static_cast<{0}*>(this)", actorClassFormal.name); + // e.g. SimpleTimerActorState stateClassName = className + "State"; + // e.g. SimpleTimerActorState var fullStateClassName = stateClassName + GetTemplateActuals(new VarDeclaration { type = "class", name = fullClassName }); if (actor.isForwardDeclaration) { @@ -414,59 +514,11 @@ namespace actorcompiler } writer.WriteLine("};"); - // The final actor class mixes in the State class, the Actor base class and all callback classes - writer.WriteLine("// This generated class is to be used only via {0}()", actor.name); - WriteTemplate(writer); - LineNumber(writer, actor.SourceLine); + WriteActorClass(writer, fullStateClassName, body); - string callback_base_classes = string.Join(", ", callbacks.Select(c=>string.Format("public {0}", c.type))); - if (callback_base_classes != "") callback_base_classes += ", "; - writer.WriteLine("class {0} final : public Actor<{2}>, {3}public FastAllocated<{1}>, public {4} {{", - className, - fullClassName, - actor.returnType == null ? "void" : actor.returnType, - callback_base_classes, - fullStateClassName - ); - writer.WriteLine("public:"); - writer.WriteLine("\tusing FastAllocated<{0}>::operator new;", fullClassName); - writer.WriteLine("\tusing FastAllocated<{0}>::operator delete;", fullClassName); + if (isTopLevel && actor.nameSpace == null) writer.WriteLine("} // namespace"); // namespace - writer.WriteLine("#pragma clang diagnostic push"); - writer.WriteLine("#pragma clang diagnostic ignored \"-Wdelete-non-virtual-dtor\""); - if (actor.returnType != null) - writer.WriteLine("\tvoid destroy() override {{ ((Actor<{0}>*)this)->~Actor(); operator delete(this); }}", actor.returnType); - else - writer.WriteLine("\tvoid destroy() {{ ((Actor*)this)->~Actor(); operator delete(this); }}"); - writer.WriteLine("#pragma clang diagnostic pop"); - - foreach (var cb in callbacks) - writer.WriteLine("friend struct {0};", cb.type); - - LineNumber(writer, actor.SourceLine); - WriteConstructor(body, writer, fullStateClassName); - //WriteStartFunc(body, writer); - WriteCancelFunc(writer); - writer.WriteLine("};"); - if (isTopLevel && actor.nameSpace == null) writer.WriteLine("}"); // namespace - WriteTemplate(writer); - LineNumber(writer, actor.SourceLine); - foreach (string attribute in actor.attributes) { - writer.Write(attribute + " "); - } - if (actor.isStatic) writer.Write("static "); - writer.WriteLine("{0} {3}{1}( {2} ) {{", fullReturnType, actor.name, string.Join(", ", ParameterList()), actor.nameSpace==null ? "" : actor.nameSpace + "::"); - LineNumber(writer, actor.SourceLine); - - string newActor = string.Format("new {0}({1})", - fullClassName, - string.Join(", ", actor.parameters.Select(p => p.name).ToArray())); - - if (actor.returnType != null) - writer.WriteLine("\treturn Future<{1}>({0});", newActor, actor.returnType); - else - writer.WriteLine("\t{0};", newActor); - writer.WriteLine("}"); + WriteActorFunction(writer, fullReturnType); if (actor.testCaseParameters != null) { @@ -482,6 +534,11 @@ namespace actorcompiler if (generateProbes) { fun.WriteLine("fdb_probe_actor_enter(\"{0}\", {1}, {2});", name, thisAddress, index); } + var blockIdentifier = GetUidFromString(fun.name); + fun.WriteLine("#ifdef WITH_ACAC"); + fun.WriteLine("static constexpr ActorBlockIdentifier __identifier = UID({0}UL, {1}UL);", blockIdentifier.Item1, blockIdentifier.Item2); + fun.WriteLine("ActorExecutionContextHelper __helper(static_cast<{0}*>(this)->activeActorHelper.actorID, __identifier);", className); + fun.WriteLine("#endif // WITH_ACAC"); } void ProbeExit(Function fun, string name, int index = -1) { @@ -1291,20 +1348,29 @@ namespace actorcompiler endIsUnreachable = true, publicName = true }; + + // Initializes class member variables constructor.Indent(codeIndent); constructor.WriteLine( " : Actor<" + (actor.returnType == null ? "void" : actor.returnType) + ">()," ); - constructor.WriteLine( " {0}({1})", fullStateClassName, string.Join(", ", actor.parameters.Select(p => p.name))); + constructor.WriteLine( " {0}({1}),", fullStateClassName, string.Join(", ", actor.parameters.Select(p => p.name))); + constructor.WriteLine( " activeActorHelper(__actorIdentifier)"); constructor.Indent(-1); + constructor.WriteLine("{"); constructor.Indent(+1); + ProbeEnter(constructor, actor.name); + constructor.WriteLine("#ifdef ENABLE_SAMPLING"); constructor.WriteLine("this->lineage.setActorName(\"{0}\");", actor.name); constructor.WriteLine("LineageScope _(&this->lineage);"); - constructor.WriteLine("#endif"); // constructor.WriteLine("getCurrentLineage()->modify(&StackLineage::actorName) = \"{0}\"_sr;", actor.name); + constructor.WriteLine("#endif"); + constructor.WriteLine("this->{0};", body.call()); + ProbeExit(constructor, actor.name); + WriteFunction(writer, constructor, constructor.BodyText); } diff --git a/flow/actorcompiler/ActorParser.cs b/flow/actorcompiler/ActorParser.cs index c07e332596..ace61c7ec2 100644 --- a/flow/actorcompiler/ActorParser.cs +++ b/flow/actorcompiler/ActorParser.cs @@ -1,4 +1,4 @@ -/* +/*; * ActorParser.cs * * This source file is part of the FoundationDB open source project @@ -18,10 +18,9 @@ * limitations under the License. */ -using System; +using System; using System.Collections.Generic; using System.Linq; -using System.Text; using System.Text.RegularExpressions; namespace actorcompiler @@ -242,12 +241,14 @@ namespace actorcompiler string sourceFile; ErrorMessagePolicy errorMessagePolicy; public bool generateProbes; + public Dictionary<(ulong, ulong), string> uidObjects { get; private set; } public ActorParser(string text, string sourceFile, ErrorMessagePolicy errorMessagePolicy, bool generateProbes) { this.sourceFile = sourceFile; this.errorMessagePolicy = errorMessagePolicy; this.generateProbes = generateProbes; + this.uidObjects = new Dictionary<(ulong, ulong), string>(); tokens = Tokenize(text).Select(t=>new Token{ Value=t }).ToArray(); CountParens(); //if (sourceFile.EndsWith(".h")) LineNumbersEnabled = false; @@ -340,15 +341,17 @@ namespace actorcompiler } if (tokens[i].Value == "ACTOR" || tokens[i].Value == "SWIFT_ACTOR" || tokens[i].Value == "TEST_CASE") { - int end; - var actor = ParseActor(i, out end); + var actor = ParseActor(i, out int end); if (classContextStack.Count > 0) { actor.enclosingClass = String.Join("::", classContextStack.Reverse().Select(t => t.name)); } var actorWriter = new System.IO.StringWriter(); actorWriter.NewLine = "\n"; - new ActorCompiler(actor, sourceFile, inBlocks == 0, LineNumbersEnabled, generateProbes).Write(actorWriter); + var actorCompiler = new ActorCompiler(actor, sourceFile, inBlocks == 0, LineNumbersEnabled, generateProbes); + actorCompiler.Write(actorWriter); + actorCompiler.uidObjects.ToList().ForEach(x => this.uidObjects.TryAdd(x.Key, x.Value)); + string[] actorLines = actorWriter.ToString().Split('\n'); bool hasLineNumber = false; diff --git a/flow/actorcompiler/Program.cs b/flow/actorcompiler/Program.cs index cdb8ee8d6e..b9cabbc9c1 100644 --- a/flow/actorcompiler/Program.cs +++ b/flow/actorcompiler/Program.cs @@ -19,15 +19,22 @@ */ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.IO; namespace actorcompiler { class Program { + private static void OverwriteByMove(string target, string temporaryFile) { + if (File.Exists(target)) + { + File.SetAttributes(target, FileAttributes.Normal); + File.Delete(target); + } + File.Move(temporaryFile, target); + File.SetAttributes(target, FileAttributes.ReadOnly); + } + public static int Main(string[] args) { bool generateProbes = false; @@ -38,7 +45,7 @@ namespace actorcompiler return 100; } Console.WriteLine("actorcompiler {0}", string.Join(" ", args)); - string input = args[0], output = args[1], outputtmp = args[1] + ".tmp"; + string input = args[0], output = args[1], outputtmp = args[1] + ".tmp", outputUid = args[1] + ".uid"; ErrorMessagePolicy errorMessagePolicy = new ErrorMessagePolicy(); foreach (var arg in args) { if (arg.StartsWith("--")) { @@ -52,15 +59,20 @@ namespace actorcompiler try { var inputData = File.ReadAllText(input); - using (var outputStream = new StreamWriter(outputtmp)) - new ActorParser(inputData, input.Replace('\\', '/'), errorMessagePolicy, generateProbes).Write(outputStream, output.Replace('\\', '/')); - if (File.Exists(output)) - { - File.SetAttributes(output, FileAttributes.Normal); - File.Delete(output); + var parser = new ActorParser(inputData, input.Replace('\\', '/'), errorMessagePolicy, generateProbes); + + using (var outputStream = new StreamWriter(outputtmp)) { + parser.Write(outputStream, output.Replace('\\', '/')); } - File.Move(outputtmp, output); - File.SetAttributes(output, FileAttributes.ReadOnly); + OverwriteByMove(output, outputtmp); + + using (var outputStream = new StreamWriter(outputtmp)) { + foreach(var entry in parser.uidObjects) { + outputStream.WriteLine("{0}|{1}|{2}", entry.Key.Item1, entry.Key.Item2, entry.Value); + } + } + OverwriteByMove(outputUid, outputtmp); + return 0; } catch (actorcompiler.Error e) diff --git a/flow/include/flow/ActorContext.h b/flow/include/flow/ActorContext.h new file mode 100644 index 0000000000..091db13962 --- /dev/null +++ b/flow/include/flow/ActorContext.h @@ -0,0 +1,137 @@ +/* + * ActorContext.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_ACTOR_CONTEXT_H +#define FLOW_ACTOR_CONTEXT_H + +#ifdef WITH_ACAC + +#include +#include +#include +#include +#include +#include + +#include "flow/FastAlloc.h" +#include "flow/FastRef.h" +#include "flow/IRandom.h" + +using ActorIdentifier = UID; +using ActorID = uint64_t; + +static constexpr ActorID INVALID_ACTOR_ID = std::numeric_limits::max(); +static constexpr ActorID INIT_ACTOR_ID = 0; + +struct ActiveActor { + ActorIdentifier identifier = ActorIdentifier(); + ActorID id = ActorID(); + double spawnTime = double(); + + ActorID spawner = INVALID_ACTOR_ID; + + ActiveActor(); + explicit ActiveActor(const ActorIdentifier& identifier_, + const ActorID& id_, + const ActorID& spawnerID_ = INVALID_ACTOR_ID); + + template + void serialize(Ar& ar) { + serializer(ar, identifier, id, spawnTime, spawner); + } +}; + +using ActorBlockIdentifier = UID; + +struct ActorExecutionContext { + ActorID actorID; + + ActorBlockIdentifier blockIdentifier; + + explicit ActorExecutionContext(const ActorID actorID_, const ActorBlockIdentifier blockIdentifier_) + : actorID(actorID_), blockIdentifier(blockIdentifier_) {} +}; + +// Dumps the current ACTORs to a given stream +extern void dumpActors(std::ostream& stream); + +// A helper class that register/unregister the Actor +class ActiveActorHelper { +public: + ActorID actorID; + + ActiveActorHelper(const ActorIdentifier& actorIdentifier); + ~ActiveActorHelper(); +}; + +class ActorExecutionContextHelper { +public: + ActorExecutionContextHelper(const ActorID& actorID_, const ActorBlockIdentifier& blockIdentifier_); + ~ActorExecutionContextHelper(); +}; + +enum class ActorContextDumpType : uint8_t { + FULL_CONTEXT, + CURRENT_STACK, + CURRENT_CALL_BACKTRACE, +}; + +// Encode the current actor context into a string +extern std::string encodeActorContext(const ActorContextDumpType dumpType = ActorContextDumpType::FULL_CONTEXT); + +struct DecodedActorContext { + struct ActorInfo { + ActorID id; + ActorIdentifier identifier; + ActorID spawnerID; + + ActorInfo(const ActorID& _id, const ActorIdentifier& _identifier, const ActorID& _spawnerID) + : id(_id), identifier(_identifier), spawnerID(_spawnerID) {} + }; + ActorID currentRunningActor; + std::vector context; + ActorContextDumpType dumpType; +}; + +// Decode the serialized actor context to DecodedActorContext +DecodedActorContext decodeActorContext(const std::string& caller); + +#else // WITH_ACAC + +#include + +#include "flow/IRandom.h" + +using ActorID = uint64_t; +using ActorIdentifier = UID; +using ActorBlockIdentifier = UID; + +struct ActorExecutionContext {}; +struct ActiveActor {}; +struct ActiveActorHelper { + ActiveActorHelper(const ActorIdentifier&) {} +}; +struct ActorExecutionContextHelper { + ActorExecutionContextHelper(const ActorID&, const ActorBlockIdentifier&); +}; + +#endif // WITH_ACAC + +#endif // FLOW_ACTOR_CONTEXT_H diff --git a/flow/include/flow/Arena.h b/flow/include/flow/Arena.h index 1f2d6cfd82..068f7a8e52 100644 --- a/flow/include/flow/Arena.h +++ b/flow/include/flow/Arena.h @@ -23,10 +23,10 @@ #pragma once #include "flow/BooleanParam.h" +#include "flow/Error.h" #include "flow/FastAlloc.h" #include "flow/FastRef.h" -#include "flow/Error.h" -#include "flow/Trace.h" +#include "flow/IRandom.h" #include "flow/ObjectSerializerTraits.h" #include "flow/FileIdentifier.h" #include "flow/swift_support.h" diff --git a/flow/include/flow/Error.h b/flow/include/flow/Error.h index 43ba48c1ba..8ea3ec3c0e 100644 --- a/flow/include/flow/Error.h +++ b/flow/include/flow/Error.h @@ -26,11 +26,13 @@ #include #include #include +#include "flow/ActorContext.h" #include "flow/Platform.h" #include "flow/Knobs.h" #include "flow/FileIdentifier.h" #include "flow/ObjectSerializerTraits.h" #include "flow/Traceable.h" +#include enum { invalid_error_code = 0xffff }; @@ -57,7 +59,7 @@ public: serializer(ar, error_code); } - Error() : error_code(invalid_error_code), flags(0) {} + Error(); explicit Error(int error_code); static void init(); diff --git a/flow/include/flow/FastAlloc.h b/flow/include/flow/FastAlloc.h index ac08234b49..af5fe9bb7b 100644 --- a/flow/include/flow/FastAlloc.h +++ b/flow/include/flow/FastAlloc.h @@ -22,7 +22,6 @@ #define FLOW_FASTALLOC_H #pragma once -#include "flow/Error.h" #include "flow/Platform.h" #include "flow/config.h" diff --git a/flow/include/flow/IAsyncFile.h b/flow/include/flow/IAsyncFile.h index 94785c0a40..df62e7383a 100644 --- a/flow/include/flow/IAsyncFile.h +++ b/flow/include/flow/IAsyncFile.h @@ -65,6 +65,8 @@ public: virtual void addref() = 0; virtual void delref() = 0; + virtual StringRef getClassName() { return "IAsyncFile"_sr; } + // For read() and write(), the data buffer must remain valid until the future is ready virtual Future read(void* data, int length, diff --git a/flow/include/flow/IRandom.h b/flow/include/flow/IRandom.h index 227a5b79d5..d33477299b 100644 --- a/flow/include/flow/IRandom.h +++ b/flow/include/flow/IRandom.h @@ -93,8 +93,8 @@ public: static UID fromStringThrowsOnFailure(std::string const&); template - void serialize_unversioned( - Ar& ar) { // Changing this serialization format will affect key definitions, so can't simply be versioned! + void serialize_unversioned(Ar& ar) { + // Changing this serialization format will affect key definitions, so can't simply be versioned! serializer(ar, part[0], part[1]); } }; diff --git a/flow/include/flow/Optional.h b/flow/include/flow/Optional.h index a3f9d8b52e..458129ccf4 100644 --- a/flow/include/flow/Optional.h +++ b/flow/include/flow/Optional.h @@ -26,13 +26,13 @@ #include "flow/Traceable.h" #include "flow/FileIdentifier.h" -#include "flow/Error.h" #include "flow/swift_support.h" #ifdef WITH_SWIFT #include #endif class Arena; +class Void; // Optional is a wrapper for std::optional. There // are two primary reasons to use this wrapper instead diff --git a/flow/include/flow/flow.h b/flow/include/flow/flow.h index ada0589045..a72bc781f6 100644 --- a/flow/include/flow/flow.h +++ b/flow/include/flow/flow.h @@ -20,6 +20,7 @@ #ifndef FLOW_FLOW_H #define FLOW_FLOW_H +#include "flow/ActorContext.h" #include "flow/Arena.h" #include "flow/FastRef.h" #pragma once @@ -31,25 +32,29 @@ #pragma warning(error : 4239) #endif -#include -#include -#include -#include -#include #include +#include +#include #include #include +#include +#include +#include +#include +#include +#include #include "flow/CodeProbe.h" -#include "flow/Platform.h" -#include "flow/FastAlloc.h" -#include "flow/IRandom.h" -#include "flow/serialize.h" #include "flow/Deque.h" -#include "flow/ThreadPrimitives.h" -#include "flow/network.h" +#include "flow/Error.h" +#include "flow/FastAlloc.h" #include "flow/FileIdentifier.h" +#include "flow/IRandom.h" +#include "flow/Platform.h" +#include "flow/ThreadPrimitives.h" #include "flow/WriteOnlySet.h" +#include "flow/network.h" +#include "flow/serialize.h" #ifdef WITH_SWIFT #include @@ -58,7 +63,7 @@ // without relying on any imported Swift types. #ifndef SWIFT_HIDE_CHECKED_CONTINUTATION #include "SwiftModules/Flow_CheckedContinuation.h" -#endif /* SWIFT_HIDE_CHECKED_CONTINUATION */ +#endif /* SWIFT_HIDE_CHECKED_CONTINUATION */ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wnullability-completeness" @@ -953,15 +958,12 @@ public: using AssociatedFuture = Future; private: - SwiftCC continuationInstance; + SwiftCC continuationInstance; public: - FlowCallbackForSwiftContinuation() : - continuationInstance(SwiftCC::init()) {} + FlowCallbackForSwiftContinuation() : continuationInstance(SwiftCC::init()) {} - void set(const void * _Nonnull pointerToContinuationInstance, - Future f, - const void * _Nonnull thisPointer) { + void set(const void* _Nonnull pointerToContinuationInstance, Future f, const void* _Nonnull thisPointer) { // Verify Swift did not make a copy of the `self` value for this method // call. assert(this == thisPointer); @@ -969,16 +971,16 @@ public: // FIXME: Propagate `SwiftCC` to Swift using forward // interop, without relying on passing it via a `void *` // here. That will let us avoid this hack. - const void *_Nonnull opaqueStorage = pointerToContinuationInstance; - static_assert(sizeof(SwiftCC) == sizeof(const void *)); - const SwiftCC ccCopy(*reinterpret_cast(&opaqueStorage)); + const void* _Nonnull opaqueStorage = pointerToContinuationInstance; + static_assert(sizeof(SwiftCC) == sizeof(const void*)); + const SwiftCC ccCopy(*reinterpret_cast(&opaqueStorage)); // Set the continuation instance. continuationInstance.set(ccCopy); // Add this callback to the future. f.addCallbackAndClear(this); } - void fire(const T &value) override { + void fire(const T& value) override { Callback::remove(); Callback::next = nullptr; continuationInstance.resume(value); @@ -996,14 +998,13 @@ public: #endif /* WITH_SWIFT*/ template -class -SWIFT_SENDABLE +class SWIFT_SENDABLE #ifndef SWIFT_HIDE_CHECKED_CONTINUTATION #ifdef WITH_SWIFT SWIFT_CONFORMS_TO_PROTOCOL(flow_swift.FlowFutureOps) #endif #endif -Future { + Future { public: using Element = T; #ifdef WITH_SWIFT @@ -1121,18 +1122,15 @@ public: sav->send(std::forward(value)); } - // Swift can't call method that takes in a universal references (U&&), - // so provide a callable `send` method that copies the value. - void sendCopy(const T& valueCopy) const SWIFT_NAME(send(_:)) { - sav->send(valueCopy); - } + // Swift can't call method that takes in a universal references (U&&), + // so provide a callable `send` method that copies the value. + void sendCopy(const T& valueCopy) const SWIFT_NAME(send(_:)) { sav->send(valueCopy); } template void sendError(const E& exc) const { sav->sendError(exc); } - SWIFT_CXX_IMPORT_UNSAFE Future getFuture() const { sav->addFutureRef(); return Future(sav); @@ -1352,15 +1350,14 @@ public: bool operator==(const FutureStream& rhs) { return rhs.queue == queue; } bool operator!=(const FutureStream& rhs) { return rhs.queue != queue; } - // FIXME: remove annotation after https://github.com/apple/swift/issues/64316 is fixed. - T pop() __attribute__((swift_attr("import_unsafe"))) { return queue->pop(); } + // FIXME: remove annotation after https://github.com/apple/swift/issues/64316 is fixed. + T pop() __attribute__((swift_attr("import_unsafe"))) { return queue->pop(); } Error getError() const { ASSERT(queue->isError()); return queue->error; } - explicit FutureStream(NotifiedQueue* queue) : queue(queue) { - } + explicit FutureStream(NotifiedQueue* queue) : queue(queue) {} private: NotifiedQueue* queue; @@ -1402,15 +1399,9 @@ public: // stream.send( request ) // Unreliable at most once delivery: Delivers request unless there is a connection failure (zero or one times) - void send(const T& value) { - queue->send(value); - } - void sendCopy(T value) { - queue->send(value); - } - void send(T&& value) { - queue->send(std::move(value)); - } + void send(const T& value) { queue->send(value); } + void sendCopy(T value) { queue->send(value); } + void send(T&& value) { queue->send(std::move(value)); } void sendError(const Error& error) { queue->sendError(error); } // stream.getReply( request ) @@ -1443,7 +1434,7 @@ public: // Not const, because this function gives mutable // access to queue - SWIFT_CXX_IMPORT_UNSAFE FutureStream getFuture() { + SWIFT_CXX_IMPORT_UNSAFE FutureStream getFuture() { queue->addFutureRef(); return FutureStream(queue); }