diff --git a/cmake/FindCoroutines.cmake b/cmake/FindCoroutines.cmake new file mode 100644 index 0000000000..b2efd1aceb --- /dev/null +++ b/cmake/FindCoroutines.cmake @@ -0,0 +1,282 @@ +# Copyright (c) 2019-present, Facebook, Inc. +# +# This source code is licensed under the Apache License found in the +# LICENSE.txt file in the root directory of this source tree. + +#[=======================================================================[.rst: + +FindCoroutines +############## + +This module supports the C++ standard support for coroutines. Use +the :imp-target:`std::coroutines` imported target to + +Options +******* + +The ``COMPONENTS`` argument to this module supports the following values: + +.. find-component:: Experimental + :name: coro.Experimental + + Allows the module to find the "experimental" Coroutines TS + version of the coroutines library. This is the library that should be + used with the ``std::experimental`` namespace. + +.. find-component:: Final + :name: coro.Final + + Finds the final C++20 standard version of coroutines. + +If no components are provided, behaves as if the +:find-component:`coro.Final` component was specified. + +If both :find-component:`coro.Experimental` and :find-component:`coro.Final` are +provided, first looks for ``Final``, and falls back to ``Experimental`` in case +of failure. If ``Final`` is found, :imp-target:`std::coroutines` and all +:ref:`variables ` will refer to the ``Final`` version. + + +Imported Targets +**************** + +.. imp-target:: std::coroutines + + The ``std::coroutines`` imported target is defined when any requested + version of the C++ coroutines library has been found, whether it is + *Experimental* or *Final*. + + If no version of the coroutines library is available, this target will not + be defined. + + .. note:: + This target has ``cxx_std_17`` as an ``INTERFACE`` + :ref:`compile language standard feature `. Linking + to this target will automatically enable C++17 if no later standard + version is already required on the linking target. + + +.. _coro.variables: + +Variables +********* + +.. variable:: CXX_COROUTINES_HAVE_COROUTINES + + Set to ``TRUE`` when coroutines are supported in both the language and the + library. + +.. variable:: CXX_COROUTINES_HEADER + + Set to either ``coroutine`` or ``experimental/coroutine`` depending on + whether :find-component:`coro.Final` or :find-component:`coro.Experimental` was + found. + +.. variable:: CXX_COROUTINES_NAMESPACE + + Set to either ``std`` or ``std::experimental`` + depending on whether :find-component:`coro.Final` or + :find-component:`coro.Experimental` was found. + + +Examples +******** + +Using `find_package(Coroutines)` with no component arguments: + +.. code-block:: cmake + + find_package(Coroutines REQUIRED) + + add_executable(my-program main.cpp) + target_link_libraries(my-program PRIVATE std::coroutines) + + +#]=======================================================================] + + +if(TARGET std::coroutines) + # This module has already been processed. Don't do it again. + return() +endif() + +include(CheckCXXCompilerFlag) +include(CMakePushCheckState) +include(CheckIncludeFileCXX) +include(CheckCXXSourceCompiles) + +cmake_push_check_state() + +set(CMAKE_REQUIRED_QUIET ${Coroutines_FIND_QUIETLY}) + +check_cxx_compiler_flag(/await _CXX_COROUTINES_SUPPORTS_MS_FLAG) +check_cxx_compiler_flag(/await:heapelide _CXX_COROUTINES_SUPPORTS_MS_HEAPELIDE_FLAG) +check_cxx_compiler_flag(-fcoroutines-ts _CXX_COROUTINES_SUPPORTS_TS_FLAG) +check_cxx_compiler_flag(-fcoroutines _CXX_COROUTINES_SUPPORTS_CORO_FLAG) + +if(_CXX_COROUTINES_SUPPORTS_MS_FLAG) + set(_CXX_COROUTINES_EXTRA_FLAGS "/await") + if(_CXX_COROUTINES_SUPPORTS_MS_HEAPELIDE_FLAG AND CMAKE_SIZEOF_VOID_P GREATER_EQUAL 8) + list(APPEND _CXX_COROUTINES_EXTRA_FLAGS "/await:heapelide") + endif() +elseif(_CXX_COROUTINES_SUPPORTS_TS_FLAG) + set(_CXX_COROUTINES_EXTRA_FLAGS "-fcoroutines-ts") +elseif(_CXX_COROUTINES_SUPPORTS_CORO_FLAG) + set(_CXX_COROUTINES_EXTRA_FLAGS "-fcoroutines") +endif() + +# Normalize and check the component list we were given +set(want_components ${Coroutines_FIND_COMPONENTS}) +if(Coroutines_FIND_COMPONENTS STREQUAL "") + set(want_components Final) +endif() + +# Warn on any unrecognized components +set(extra_components ${want_components}) +list(REMOVE_ITEM extra_components Final Experimental) +foreach(component IN LISTS extra_components) + message(WARNING "Extraneous find_package component for Coroutines: ${component}") +endforeach() + +# Detect which of Experimental and Final we should look for +set(find_experimental TRUE) +set(find_final TRUE) +if(NOT "Final" IN_LIST want_components) + set(find_final FALSE) +endif() +if(NOT "Experimental" IN_LIST want_components) + set(find_experimental FALSE) +endif() + +if(find_final) + check_include_file_cxx("coroutine" _CXX_COROUTINES_HAVE_HEADER) + if(_CXX_COROUTINES_HAVE_HEADER) + check_cxx_source_compiles("#include \n typedef std::suspend_never blub; \nint main() {} " _CXX_COROUTINES_FINAL_HEADER_COMPILES) + set(_CXX_COROUTINES_HAVE_HEADER "${_CXX_COROUTINES_FINAL_HEADER_COMPILES}") + endif() + + if(NOT _CXX_COROUTINES_HAVE_HEADER) + cmake_push_check_state() + set(CMAKE_REQUIRED_FLAGS "${_CXX_COROUTINES_EXTRA_FLAGS}") + check_include_file_cxx("coroutine" _CXX_COROUTINES_HAVE_HEADER_WITH_FLAG) + if(_CXX_COROUTINES_HAVE_HEADER_WITH_FLAG) + check_cxx_source_compiles("#include \n typedef std::suspend_never blub; \nint main() {} " _CXX_COROUTINES_FINAL_HEADER_COMPILES_WITH_FLAG) + set(_CXX_COROUTINES_HAVE_HEADER_WITH_FLAG "${_CXX_COROUTINES_FINAL_HEADER_COMPILES_WITH_FLAG}") + endif() + set(_CXX_COROUTINES_HAVE_HEADER "${_CXX_COROUTINES_HAVE_HEADER_WITH_FLAG}") + cmake_pop_check_state() + endif() + mark_as_advanced(_CXX_COROUTINES_HAVE_HEADER) + if(_CXX_COROUTINES_HAVE_HEADER) + # We found the non-experimental header. Don't bother looking for the + # experimental one. + set(find_experimental FALSE) + endif() +else() + set(_CXX_COROUTINES_HAVE_HEADER FALSE) +endif() + +if(find_experimental) + check_include_file_cxx("experimental/coroutine" _CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER) + if(NOT _CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER) + cmake_push_check_state() + set(CMAKE_REQUIRED_FLAGS "${_CXX_COROUTINES_EXTRA_FLAGS}") + check_include_file_cxx("experimental/coroutine" _CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER_WITH_FLAG) + set(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER "${_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER_WITH_FLAG}") + cmake_pop_check_state() + endif() + mark_as_advanced(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER) +else() + set(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER FALSE) +endif() + +if(_CXX_COROUTINES_HAVE_HEADER) + set(_have_coro TRUE) + set(_coro_header coroutine) + set(_coro_namespace std) +elseif(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER) + set(_have_coro TRUE) + set(_coro_header experimental/coroutine) + set(_coro_namespace std::experimental) +else() + set(_have_coro FALSE) +endif() + +set(CXX_COROUTINES_HAVE_COROUTINES ${_have_coro} CACHE BOOL "TRUE if we have the C++ coroutines feature") +set(CXX_COROUTINES_HEADER ${_coro_header} CACHE STRING "The header that should be included to obtain the coroutines APIs") +set(CXX_COROUTINES_NAMESPACE ${_coro_namespace} CACHE STRING "The C++ namespace that contains the coroutines APIs") + +set(_found FALSE) + +if(CXX_COROUTINES_HAVE_COROUTINES) + # We have some coroutines library available. Do link checks + string(CONFIGURE [[ + #include + #include <@CXX_COROUTINES_HEADER@> + + struct present { + struct promise_type { + int result; + present get_return_object() { return present{*this}; } + @CXX_COROUTINES_NAMESPACE@::suspend_never initial_suspend() { return {}; } + @CXX_COROUTINES_NAMESPACE@::suspend_always final_suspend() noexcept { return {}; } + void return_value(int i) { result = i; } + void unhandled_exception() {} + }; + friend struct promise_type; + present(present&& that) : coro_(std::exchange(that.coro_, {})) {} + ~present() { if(coro_) coro_.destroy(); } + bool await_ready() const { return true; } + void await_suspend(@CXX_COROUTINES_NAMESPACE@::coroutine_handle<>) const {} + int await_resume() const { return coro_.promise().result; } + private: + present(promise_type& promise) + : coro_(@CXX_COROUTINES_NAMESPACE@::coroutine_handle::from_promise(promise)) {} + @CXX_COROUTINES_NAMESPACE@::coroutine_handle coro_; + }; + + present f(int n) { + if (n < 2) + co_return 1; + else + co_return n * co_await f(n - 1); + } + + int main() { + return f(5).await_resume() != 120; + } + ]] code @ONLY) + + # Try to compile a simple coroutines program without any compiler flags + check_cxx_source_compiles("${code}" CXX_COROUTINES_NO_AWAIT_NEEDED) + + set(can_link ${CXX_COROUTINES_NO_AWAIT_NEEDED}) + + if(NOT CXX_COROUTINES_NO_AWAIT_NEEDED) + # Add the -fcoroutines-ts (or /await) flag + set(CMAKE_REQUIRED_FLAGS "${_CXX_COROUTINES_EXTRA_FLAGS}") + check_cxx_source_compiles("${code}" CXX_COROUTINES_AWAIT_NEEDED) + set(can_link "${CXX_COROUTINES_AWAIT_NEEDED}") + endif() + + if(can_link) + add_library(std::coroutines INTERFACE IMPORTED) + set(_found TRUE) + + if(CXX_COROUTINES_NO_AWAIT_NEEDED) + # Nothing to add... + elseif(CXX_COROUTINES_AWAIT_NEEDED) + target_compile_options(std::coroutines INTERFACE ${_CXX_COROUTINES_EXTRA_FLAGS}) + endif() + else() + set(CXX_COROUTINES_HAVE_COROUTINES FALSE) + endif() +endif() + +cmake_pop_check_state() + +set(Coroutines_FOUND ${_found} CACHE BOOL "TRUE if we can compile and link a program using std::coroutines" FORCE) + +if(Coroutines_FIND_REQUIRED AND NOT Coroutines_FOUND) + message(FATAL_ERROR "Cannot compile simple program using std::coroutines. Is C++17 or later activated?") +endif() diff --git a/design/coroutines.md b/design/coroutines.md new file mode 100644 index 0000000000..f1f85d4d21 --- /dev/null +++ b/design/coroutines.md @@ -0,0 +1,700 @@ +# Coroutines in Flow + +* [Introduction](#Introduction) +* [Coroutines vs ACTORs](#coroutines-vs-actors) +* [Basic Types](#basic-types) +* [Choose-When](#choose-when) + * [Execution in when-expressions](#execution-in-when-expressions) + * [Waiting in When-Blocks](#waiting-in-when-blocks) +* [Generators](#generators) + * [Generators and ranges](#generators-and-ranges) + * [Eager vs Lazy Execution](#eager-vs-lazy-execution) + * [Generators vs Promise Streams](#generators-vs-promise-streams) +* [Uncancellable](#uncancellable) +* [Porting ACTOR's to C++ Coroutines](#porting-actors-to-c-coroutines) + * [Lifetime of Locals](#lifetime-of-locals) + * [Unnecessary Helper Actors](#unnecessary-helper-actors) + * [Replace Locals with Temporaries](#replace-locals-with-temporaries) + * [Don't Wait in Error-Handlers](#dont-wait-in-error-handlers) + * [Make Static Functions Class Members](#make-static-functions-class-members) + * [Initialization of Locals](#initialization-of-locals) + +## Introduction + +In the past Flow implemented an actor mode by shipping its own compiler which would +extend the C++ language with a few additional keywords. This, while still supported, +is deprecated in favor of the standard C++20 coroutines. + +Coroutines are meant to be simple, look like serial code, and be easy to reason about. +As simple example for a coroutine function can look like this: + +```c++ +Future simpleCoroutine() { + double begin = now(); + co_await delay(1.0); + co_return now() - begin; +} +``` + +This document assumes some familiarity with Flow. As of today, actors and coroutines +can be freely mixed, but new code should be written using coroutines. + +## Coroutines vs ACTORs + +It is important to understand that C++ coroutine support doesn't change anything in Flow: they are not a replacement +of Flow but they replace the actor compiler with a C++ compiler. This means, that the network loop, all Flow types, +the RPC layer, and the simulator all remain unchanged. A coroutine simply returns a special `SAV` which has handle +to a coroutine. + +## Basic Types + +As defined in the C++20 standard, a function is a coroutine if its body contains at least one `co_await`, `co_yield`, +or `co_return` statement. However, in order for this to work, the return type needs an underlying coroutine +implementation. Flow provides these for the following types: + +* `Future` is the primary type we use for coroutines. A coroutine returning + `Future` is allowed to `co_await` other coroutines and it can `co_return` + a single value. `co_yield` is not implemented by this type. + * A special case is `Future`. Void-Futures are what a user would probably + expect `Future<>` to be (it has this type for historical reasons and to + provide compatibility with old Flow `ACTOR`s). A coroutine with return type + `Future` must not return anything. So either the coroutine can run until + the end, or it can be terminated by calling `co_return`. +* `Generator` can return a stream of values. However, they can't `co_await` + other coroutines. These are useful for streams where the values are lazily + computed but don't involve any IO. +* `AsyncGenerator` is similar to `Generator` in that it can return a stream + of values, but in addition to that it can also `co_await` other coroutines. + Due to that, they're slightly less efficient than `Generator`. + `AsyncGenerator` should be used whenever values should be lazily generated + AND need IO. It is an alternative to `PromiseStream`, which can be more efficient, but is + more intuitive to use correctly. + +A more detailed explanation of `Generator` and `AsyncGenerator` can be +found further down. + +## Choose-When + +In actor compiled code we were able to use the keywords `choose` and `when` to wait on a +statically known number of futures and execute corresponding code. Something like this: + +```c++ +choose { + when(wait(future1)) { + // do something + } + when(Foo f = wait(foo())) { + // do something else + } +} +``` + +Since this is a compiler functionality, we can't use this with C++ coroutines. We could +keep only this feature around, but only using standard C++ is desirable. So instead, we +introduce a new `class` called `Choose` to achieve something very similar: + +```c++ +co_await Choose() + .When(future1, [](Void& const) { + // do something + }) + .When(foo(), [](Foo const& f) { + // do something else + }).Run(); +``` + +While `Choose` and `choose` behave very similarly, there are some minor differences between +the two. These are explained below. + +### Execution in when-expressions + +In the above example, there is one, potentially important difference between the old and new +style: in the statement `when(Foo f = wait(foo()))` is only executed if `future1` is not ready. +Depending on what the intent of the statement is, this could be desirable. Since `Choose::When` +is a normal method, `foo()` will be evaluated whether the statement is already done or not. +This can be worked around by passing a lambda that returns a Future instead: + +```c++ +co_await Choose() + .When(future1, [](Void& const) { + // do something + }) + .When([](){ return foo() }, [](Foo const& f) { + // do something else + }).Run(); +``` + +The implementation of `When` will guarantee that this lambda will only be executed if all previous +`When` calls didn't receive a ready future. + +## Waiting in When-Blocks + +In FDB we sometimes see this pattern: + +```c++ +loop { + choose { + when(RequestA req = waitNext(requestAStream.getFuture())) { + wait(handleRequestA(req)); + } + when(RequestB req = waitNext(requestBStream.getFuture())) { + wait(handleRequestb(req)); + } + //... + } +} +``` + +This is not possible to do with `Choose`. However, this is done deliberately as the above is +considered an antipattern: This means that we can't serve two requests concurrently since the loop +won't execute until the request has been served. Instead, this should be written like this: + +```c++ +state ActorCollection actors(false); +loop { + choose { + when(RequestA req = waitNext(requestAStream.getFuture())) { + actors.add(handleRequestA(req)); + } + when(RequestB req = waitNext(requestBStream.getFuture())) { + actors.add(handleRequestb(req)); + } + //... + when(wait(actors.getResult())) { + // this only makes sure that errors are thrown correctly + UNREACHABLE(); + } + } +} +``` + +And so the above can easily be rewritten using `Choose`: + +```c++ +ActorCollection actors(false); +loop { + co_await Choose() + .When(requestAStream.getFuture(), [&actors](RequestA const& req) { + actors.add(handleRequestA(req)); + }) + .When(requestBStream.getFuture(), [&actors](RequestB const& req) { + actors.add(handleRequestB(req)); + }) + .When(actors.getResult(), [](Void const&) { + UNREACHABLE(); + }).run(); +} +``` + +However, often using `choose`-`when` (or `Choose().When`) is overkill and other facilities like `quorum` and +`operator||` should be used instead. For example this: + +```c++ +choose { + when(R res = wait(f1)) { + return res; + } + when(wait(timeout(...))) { + throw io_timeout(); + } +} +``` + +Should be written like this: + +```c++ +co_await (f1 || timeout(...)); +if (f1.isReady()) { + co_return f1.get(); +} +throw io_timeout(); +``` + +(The above could also be packed into a helper function in `genericactors.actor.h`). + +## Generators + +With C++ coroutines we introduce two new basic types in Flow: `Generator` and `AsyncGenerator`. A generator is a +special type of coroutine, which can return multiple values. + +`Generator` and `AsyncGenerator` implement a different interface and serve a very different purpose. +`Generator` conforms to the `input_iterator` trait -- so it can be used like a normal iterator (with the exception +that copying the iterator has a different semantics). This also means that it can be used with the new `ranges` +library in STL which was introduced in C++20. + +`AsyncGenerator` implements the `()` operator which returns a new value every time it is called. However, this value +HAS to be waited for (dropping it and attempting to call `()` again will result in undefined behavior!). This semantic +difference allows an author to mix `co_await` and `co_yield` statements in a coroutine returning `AsyncGenerator`. + +Since generators can produce infinitely long streams, they can be useful to use in places where we'd otherwise use a +more complex in-line loop. For example, consider the code in `masterserver.actor.cpp` that is responsible generate +version numbers. The logic for this code is currently in a long function. With a `Generator` it can be isolated to +one simple coroutine (which can be a direct member of `MasterData`). A simplified version of such a generator could +look as follows: + +```c++ +Generator MasterData::versionGenerator() { + auto prevVersion = lastEpochEnd; + auto lastVersionTime = now(); + while (true) { + auto t1 = now(); + Version toAdd = + std::max(1, + std::min(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS, + SERVER_KNOBS->VERSIONS_PER_SECOND * (t1 - self->lastVersionTime))); + lastVersionTime = t1; + co_yield prevVersion + toAdd; + prevVersion += toAdd; + } +} +``` + +Now that the logic to compute versions is separated, `MasterData` can simply create an instance of `Generator` +by calling `auto vGenerator = MasterData::versionGenerator();` (and possibly storing that as a class member). It can +then access the current version by calling `*vGenerator` and go to the next generator by incrementing the iterator +(`++vGenerator`). + +`AsyncGenerator` should be used in some places where we used promise streams before (though not all of them, this +topic is discussed a bit later). For example: + +```c++ +template +AsyncGenerator filter(AsyncGenerator gen, F pred) { + while (gen) { + auto val = co_await gen(); + if (pred(val)) { + co_yield val; + } + } +} +``` + +Note how much simpler this function is compared to the old flow function: + +```c++ +ACTOR template +Future filter(FutureStream input, F pred, PromiseStream output) { + loop { + try { + T nextInput = waitNext(input); + if (pred(nextInput)) + output.send(nextInput); + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + break; + } else + throw; + } + } + + output.sendError(end_of_stream()); + + return Void(); +} +``` + +A `FutureStream` can be converted into an `AsyncGenerator` by using a simple helper function: + +```c++ +template +AsyncGenerator toGenerator(FutureStream stream) { + loop { + try { + co_yield co_await stream; + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + co_return; + } + throw; + } + } +} +``` + +### Generators and Ranges + +`Generator` can be used like an input iterator. This means, that it can also be used with `std::ranges`. Consider +the following coroutine: + +```c++ +// returns base^0, base^1, base^2, ... +Generator powersOf(double base) { + double curr = 1; + loop { + co_yield curr; + curr *= base; + } +} +``` + +We can use this now to generate views. For example: + +```c++ +for (auto v : generatorRange(powersOf(2)) + | std::ranges::views::filter([](auto v) { return v > 10; }) + | std::ranges::views::take(10)) { + fmt::print("{}\n", v); +} +``` + +The above would print all powers of two between 10 and 2^10. + +### Eager vs Lazy Execution + +One major difference between async generators and tasks (coroutines returning only one value through `Future`) is the +execution policy: An async generator will immediately suspend when it is called while a task will immediately start +execution and needs to be explicitly scheduled. + +This is a conscious design decision. Lazy execution makes it much simpler to reason about memory ownership. For example, +the following is ok: + +```c++ +Generator randomStrings(int minLen, int maxLen) { + Arena arena; + auto buffer = new (arena) uint8_t[maxLen + 1]; + while (true) { + auto sz = deterministicRandom()->randomInt(minLen, maxLen + 1); + for (int i = 0; i < sz; ++i) { + buffer[i] = deterministicRandom()->randomAlphaNumeric(); + } + co_yield StringRef(buffer, sz); + } +} +``` + +The above coroutine returns a stream of random strings. The memory is owned by the coroutine and so it always returns +a `StringRef` and then reuses the memory in the next iteration. This makes this generator very cheap to use, as it only +does one allocation in its lifetime. With eager execution, this would be much harder to write (and reason about): the +coroutine would immediately generate a string and then eagerly compute the next one when the string is retrieved. +However, in Flow a `co_yield` is guarantee to suspend the coroutine until the value was consumed (this is not generally +a guarantee with `co_yield` -- C++ coroutines give the implementor a great degree of freedom over decisions like this). + +### Generators vs Promise Streams + +Flow provides another mechanism to send streams of messages between actors: `PromiseStream`. In fact, +`AsyncGenerator` uses `PromiseStream` internally. So when should one be used over the other? + +As a general rule of thumb: whenever possible, use `Generator`, if not, use `AsyncGenerator` if in doubt. + +For pure computation it almost never makes sense to use a `PromiseStream` (the only exception is if computation +can be expensive enough that `co_await yield()` becomes necessary). `Generator` is more lightweight and therefore +usually more efficient. It is also easier to use. + +When it comes to IO it becomes a bit more tricky. Assume we want to scan a file on disk, and we want to read it in +4k blocks. This can be done quite elegantly using a coroutine: + +```c++ +AsyncGenerator> blockScanner(Reference file) { + auto sz = co_await file->size(); + decltype(sz) offset = 0; + constexpr decltype(sz) blockSize = 4*1024; + while (offset < sz) { + Arena arena; + auto block = new (arena) int8_t[blockSize]; + auto toRead = std::min(sz - offset, blockSize); + auto r = co_await file->read(block, toRead, offset); + co_yield Standalone(StringRef(block, r), arena); + offset += r; + } +} +``` + +The problem with the above generator though, is that we only start reading when the generator is invoked. If consuming +the block takes sometimes a long time (for example because it has to be written somewhere), each call will take as long +as the disk latency is for a read. + +What if we want to hide this latency? In other words: what if we want to improve throughput and end-to-end latency by +prefetching? + +Doing this with a generator, while not trivial, is possible. But here it might be easier to use a `PromiseStream` +(we can even reuse the above generator): + +```c++ +Future blockScannerWithPrefetch(Reference file, + PromiseStream promise, + FlowLock lock) { + auto generator = blockScanner(file); + while (generator) { + { + FlowLock::Releaser _(co_await lock.take()); + try { + promise.send(co_await generator()); + } catch (Error& e) { + promise.sendError(e); + co_return; + } + } + // give caller opportunity to take the lock + co_await yield(); + } +} +``` + +With the above the caller can control the prefetching dynamically by taking the lock if the queue becomes too full. + +## Uncancellable + +By default, a coroutine runs until it is either done (reaches the end of the function body, a `co_return` statement, +or throws an exception) or the last `Future` object referencing that object is being dropped. The second use-case is +implemented as follows: + +1. When the future count of a coroutine goes to `0`, the coroutine is immediately resumed and `actor_cancelled` is + thrown within that coroutine (this allows the coroutine to do some cleanup work). +2. Any attempt to run `co_await expr` will immediately throw `actor_cancelled`. + +However, some coroutines aren't safe to be cancelled. This usually concerns disk IO operations. With `ACTOR` we could +either have a return-type `void` or use the `UNCANCELLABLE` keyword to change this behavior: in this case, calling +`Future::cancel()` would be a no-op and dropping all futures wouldn't cause cancellation. + +However, with C++ coroutines, this won't work: + +* We can't introduce new keywords in pure C++ (so `UNCANCELLABLE` would require some preprocessing). +* Implementing a `promise_type` for `void` isn't a good idea, as this would make any `void`-function potentially a + coroutine. + +However, this can also be seen as an opportunity: uncancellable actors are always a bit tricky to use, since we need to +make sure that the caller keeps all memory alive that the uncancellable coroutine might reference until it is done. +Because of that, whenever someone calls a coroutine, they need to be extra careful. However, someone might not know that +the coroutine they call is uncancellable. + +We address this problem with the following definition: + +--- +*Definition*: + +A coroutine is uncancellable if the first argument (or the second, if the coroutine is a class-member) is of type +`Uncancellable` + +--- + +The definition of `Uncancellable` is trivial: `struct Uncancellable {};` -- it is simply used as a marker. So now, if +a user calls an uncancellable coroutine, it will be obvious on the caller side. For example the following is *never* +uncancellable: + +```c++ +co_await foo(); +``` + +But this one is: + +```c++ +co_await bar(Uncancellable()); +``` + +## Porting `ACTOR`'s to C++ Coroutines + +If you have an existing `ACTOR`, you can port it to a C++ coroutine by following these steps: + +1. Remove `ACTOR` keyword. +2. If the actor is marked with `UNCANCELLABLE`, remove it and make the first argument `Uncancellable`. If the return + type of the actor is `void` make it `Future` instead and add an `Uncancellable` as the first argument. +3. Remove all `state` modifiers from local variables. +4. Replace all `wait(expr)` with `co_await expr`. +5. Remove all `waitNext(expr)` with `co_await expr`. +6. Rewrite existing `choose-when` statements using the `Choose` class. + +In addition, the following things should be looked out for: + +### Lifetime of locals + +Consider this code: + +```c++ +Local foo; +wait(bar()); +... +``` + +`foo` will be destroyed right after the `wait`-expression. However, after making this a coroutine: + +```c++ +Local foo; +co_await bar(); +... +``` + +`foo` will stay alive until we leave the scope. This is better (as it is more intuitive and follows standard C++), but +in some weird corner-cases code might depend on the semantic that locals get destroyed when we call into `wait`. Look +out for things where destructors do semantically important work (like in `FlowLock::Releaser`). + +### Unnecessary Helper Actors + +In `flow/genericactors.actor.h` we have a number of useful helpers. Some of them are also useful with C++ coroutines, +others add unnecessary overhead. Look out for those and remove calls to it. The most important ones are `success` and +`store`. + +```c++ +wait(success(f)); +``` + +becomes + +```c++ +co_await f; +``` + +and + +```c++ +wait(store(v, f)); +``` + +becomes + +```c++ +v = co_await f; +``` + +### Replace Locals with Temporaries + +In certain places we use locals just to work around actor compiler limitations. Since locals use up space in the +coroutine object they should be removed wherever it makes sense (only if it doesn't make the code less readable!). + +For example: + +```c++ +Foo f = wait(foo); +bar(f); +``` + +might become + +```c++ +bar(co_await foo); +``` + +### Don't Wait in Error-Handlers + +Using `co_await` in an error-handler produces a compilation error in C++. However, this was legal with `ACTOR`. There +is no general best way of addressing this issue, but usually it's quite easy to move the `co_await` expression out of +the `catch`-block. + +One place where we use this pattern a lot if in our transaction retry loop: + +```c++ +state ReadYourWritesTransaction tr(db); +loop { + try { + Value v = wait(tr.get(key)); + tr.set(key2, val2); + wait(tr.commit()); + return Void(); + } catch (Error& e) { + wait(tr.onError(e)); + } +} +``` + +Luckily, with coroutines, we can do one better: generalize the retry loop. The above could look like this: + +```c++ +co_await db.run([&](ReadYourWritesTransaction* tr) -> Future { + Value v = wait(tr.get(key)); + tr.set(key2, val2); + wait(tr.commit()); +}); +``` + +A possible implementation of `Database::run` would be: + +```c++ +template Fun> +Future Database::run(Fun fun) { + ReadYourWritesTransaction tr(*this); + Future onError; + while (true) { + if (onError.isValid()) { + co_await onError; + onError = Future(); + } + try { + co_await fun(&tr); + } catch (Error& e) { + onError = tr.onError(e); + } + } +} +``` + +### Make Static Functions Class Members + +With actors, we often see the following pattern: + +```c++ +struct Foo : IFoo { + ACTOR static Future bar(Foo* self) { + // use `self` here to access members of `Foo` + } + + Future bar() override { + return bar(this); + } +}; +``` + +This boilerplate is necessary, because `ACTOR`s can't be class members: the actor compiler will generate another +`struct` and move the code there -- so `this` will point to the actor state and not to the class instance. + +With C++ coroutines, this limitation goes away. So a cleaner (and slightly more efficient) implementation of the above +is: + +```c++ +struct Foo : IFoo { + Future bar() override { + // `this` can be used like in any non-coroutine. `co_await` can be used. + } +}; +``` + +### Initialization of Locals + +There is one very subtle and hard to spot difference between `ACTOR` and a coroutine: the way some local variables are +initialized. Consider the following code: + +```c++ +struct SomeStruct { + int a; + bool b; +}; + +ACTOR Future someActor() { + // beginning of body + state SomeStruct someStruct; + // rest of body +} +``` + +For state variables, the actor-compiler generates the following code to initialize `SomeStruct someStruct`: + +```c++ +someStruct = SomeStruct(); +``` + +This, however, is different from what might expect since now the default constructor is explicitly called. This means +if the code is translated to: + +```c++ +Future someActor() { + // beginning of body + SomeStruct someStruct; + // rest of body +} +``` + +initialization will be different. The exact equivalent instead would be something like this: + +```c++ +Future someActor() { + // beginning of body + SomeStruct someStruct{}; // auto someStruct = SomeStruct(); + // rest of body +} +``` + +If the struct `SomeStruct` would initialize its primitive members explicitly (for example by using `int a = 0;` and +`bool b = false`) this would be a non-issue. And explicit initialization is probably the right fix here. Sadly, it +doesn't seem like UBSAN finds these kind of subtle bugs. + +Another difference is, that if a `state` variables might be initialized twice: once at the creation of the actor using +the default constructor and a second time at the point where the variable is initialized in the code. With C++ +coroutines we now get the expected behavior, which is better, but nonetheless a potential behavior change. \ No newline at end of file diff --git a/documentation/CMakeLists.txt b/documentation/CMakeLists.txt index cd8e3100e6..b842bdf280 100644 --- a/documentation/CMakeLists.txt +++ b/documentation/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(tutorial) +add_subdirectory(coro_tutorial) set(SPHINX_DOCUMENT_DIR "${CMAKE_SOURCE_DIR}/documentation/sphinx") diff --git a/documentation/coro_tutorial/CMakeLists.txt b/documentation/coro_tutorial/CMakeLists.txt new file mode 100644 index 0000000000..383bf5c9e4 --- /dev/null +++ b/documentation/coro_tutorial/CMakeLists.txt @@ -0,0 +1,2 @@ +add_flow_target(EXECUTABLE NAME coro_tutorial SRCS tutorial.cpp) +target_link_libraries(coro_tutorial PUBLIC fdbclient) \ No newline at end of file diff --git a/documentation/coro_tutorial/tutorial.cpp b/documentation/coro_tutorial/tutorial.cpp new file mode 100644 index 0000000000..b2217dc641 --- /dev/null +++ b/documentation/coro_tutorial/tutorial.cpp @@ -0,0 +1,714 @@ +/* + * tutorial.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fmt/format.h" +#include "flow/flow.h" +#include "flow/Platform.h" +#include "flow/DeterministicRandom.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include "flow/TLSConfig.actor.h" +#include "fdbrpc/Net2FileSystem.h" +#include +#include +#include +#include + +using namespace std::literals::string_literals; +using namespace std::literals::string_view_literals; + +NetworkAddress serverAddress; + +enum TutorialWellKnownEndpoints { + WLTOKEN_SIMPLE_KV_SERVER = WLTOKEN_FIRST_AVAILABLE, + WLTOKEN_ECHO_SERVER, + WLTOKEN_COUNT_IN_TUTORIAL +}; + +// this is a simple actor that will report how long +// it is already running once a second. +Future simpleTimer() { + // we need to remember the time when we first + // started. + double start_time = g_network->now(); + loop { + co_await delay(1.0); + std::cout << format("Time: %.2f\n", g_network->now() - start_time); + } +} + +// A actor that demonstrates how choose-when +// blocks work. +Future someFuture(Future ready) { + // loop choose {} works as well here - the braces are optional + loop { + co_await Choose() + .When(delay(0.5), [](Void const&) { std::cout << "Still waiting...\n"; }) + .When(ready, [](int const& r) { std::cout << format("Ready %d\n", r); }) + .run(); + } +} + +Future promiseDemo() { + Promise promise; + Future f = someFuture(promise.getFuture()); + co_await delay(3.0); + promise.send(2); + co_await f; +} + +Future eventLoop(AsyncTrigger* trigger) { + loop { + co_await Choose() + .When(delay(0.5), [](Void const&) { std::cout << "Still waiting...\n"; }) + .When(trigger->onTrigger(), [](Void const&) { std::cout << "Triggered!\n"; }) + .run(); + } +} + +Future triggerDemo() { + int runs = 1; + AsyncTrigger trigger; + auto triggerLoop = eventLoop(&trigger); + while (++runs < 10) { + co_await delay(1.0); + std::cout << "trigger.."; + trigger.trigger(); + } + std::cout << "Done."; +} + +struct EchoServerInterface { + constexpr static FileIdentifier file_identifier = 3152015; + RequestStream getInterface; + RequestStream echo; + RequestStream reverse; + RequestStream stream; + + template + void serialize(Ar& ar) { + serializer(ar, echo, reverse, stream); + } +}; + +struct GetInterfaceRequest { + constexpr static FileIdentifier file_identifier = 12004156; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct EchoRequest { + constexpr static FileIdentifier file_identifier = 10624019; + std::string message; + // this variable has to be called reply! + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, message, reply); + } +}; + +struct ReverseRequest { + constexpr static FileIdentifier file_identifier = 10765955; + std::string message; + // this variable has to be called reply! + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, message, reply); + } +}; + +struct StreamReply : ReplyPromiseStreamReply { + constexpr static FileIdentifier file_identifier = 440804; + + int index = 0; + StreamReply() = default; + explicit StreamReply(int index) : index(index) {} + + size_t expectedSize() const { return 2e6; } + + template + void serialize(Ar& ar) { + serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index); + } +}; + +struct StreamRequest { + constexpr static FileIdentifier file_identifier = 5410805; + ReplyPromiseStream reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +uint64_t tokenCounter = 1; + +Future echoServer() { + EchoServerInterface echoServer; + echoServer.getInterface.makeWellKnownEndpoint(WLTOKEN_ECHO_SERVER, TaskPriority::DefaultEndpoint); + ActorCollection requests; + loop { + try { + co_await Choose() + .When(requests.getResult(), + [](Void const&) { + // An actor collection with no constructor arguments or `false` as it's constructor argument + // will never finish. However, `getResult` will throw if any of the Futures we pass to it + // throw. So we have to wait on it, but we can assert that it either throws or is never ready + UNREACHABLE(); + }) + .When(echoServer.getInterface.getFuture(), + [&echoServer](GetInterfaceRequest const& req) { req.reply.send(echoServer); }) + .When(echoServer.echo.getFuture(), [](EchoRequest const& req) { req.reply.send(req.message); }) + .When(echoServer.reverse.getFuture(), + [](ReverseRequest const& req) { + req.reply.send(std::string(req.message.rbegin(), req.message.rend())); + }) + .When(echoServer.stream.getFuture(), + [&requests](StreamRequest const& req) { + requests.add([](StreamRequest req) -> Future { + req.reply.setByteLimit(1024); + int i = 0; + for (; i < 100; ++i) { + co_await req.reply.onReady(); + std::cout << "Send " << i << std::endl; + req.reply.send(StreamReply{ i }); + } + req.reply.sendError(end_of_stream()); + }(req)); + }) + .run(); + } catch (Error& e) { + if (e.code() != error_code_operation_obsolete) { + fprintf(stderr, "Error: %s\n", e.what()); + throw e; + } + } + } +} + +Future echoClient() { + EchoServerInterface server; + server.getInterface = + RequestStream(Endpoint::wellKnown({ serverAddress }, WLTOKEN_ECHO_SERVER)); + server = co_await server.getInterface.getReply(GetInterfaceRequest()); + EchoRequest echoRequest; + echoRequest.message = "Hello World"; + std::string echoMessage = co_await server.echo.getReply(echoRequest); + std::cout << format("Sent %s to echo, received %s\n", "Hello World", echoMessage.c_str()); + ReverseRequest reverseRequest; + reverseRequest.message = "Hello World"; + std::string reverseString = co_await server.reverse.getReply(reverseRequest); + std::cout << format("Sent %s to reverse, received %s\n", "Hello World", reverseString.c_str()); + + ReplyPromiseStream stream = server.stream.getReplyStream(StreamRequest{}); + int j = 0; + try { + loop { + StreamReply rep = co_await stream.getFuture(); + std::cout << "Rep: " << rep.index << std::endl; + ASSERT(rep.index == j++); + } + } catch (Error& e) { + ASSERT(e.code() == error_code_end_of_stream || e.code() == error_code_connection_failed); + } +} + +struct SimpleKeyValueStoreInterface { + constexpr static FileIdentifier file_identifier = 8226647; + RequestStream connect; + RequestStream get; + RequestStream set; + RequestStream clear; + + template + void serialize(Ar& ar) { + serializer(ar, connect, get, set, clear); + } +}; + +struct GetKVInterface { + constexpr static FileIdentifier file_identifier = 8062308; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct GetRequest { + constexpr static FileIdentifier file_identifier = 6983506; + std::string key; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, key, reply); + } +}; + +struct SetRequest { + constexpr static FileIdentifier file_identifier = 7554186; + std::string key; + std::string value; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, key, value, reply); + } +}; + +struct ClearRequest { + constexpr static FileIdentifier file_identifier = 8500026; + std::string from; + std::string to; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, from, to, reply); + } +}; + +Future kvStoreServer() { + SimpleKeyValueStoreInterface inf; + std::map store; + inf.connect.makeWellKnownEndpoint(WLTOKEN_SIMPLE_KV_SERVER, TaskPriority::DefaultEndpoint); + loop { + co_await Choose() + .When(inf.connect.getFuture(), + [&inf](GetKVInterface const& req) { + std::cout << "Received connection attempt\n"; + req.reply.send(inf); + }) + .When(inf.get.getFuture(), + [&store](GetRequest const& req) { + auto iter = store.find(req.key); + if (iter == store.end()) { + req.reply.sendError(io_error()); + } else { + req.reply.send(iter->second); + } + }) + .When(inf.set.getFuture(), + [&store](SetRequest const& req) { + store[req.key] = req.value; + req.reply.send(Void()); + }) + .When(inf.clear.getFuture(), + [&store](ClearRequest const& req) { + auto from = store.lower_bound(req.from); + auto to = store.lower_bound(req.to); + while (from != store.end() && from != to) { + auto next = from; + ++next; + store.erase(from); + from = next; + } + req.reply.send(Void()); + }) + .run(); + } +} + +Future connect() { + std::cout << format("%llu: Connect...\n", uint64_t(g_network->now())); + SimpleKeyValueStoreInterface c; + c.connect = RequestStream(Endpoint::wellKnown({ serverAddress }, WLTOKEN_SIMPLE_KV_SERVER)); + SimpleKeyValueStoreInterface result = co_await c.connect.getReply(GetKVInterface()); + std::cout << format("%llu: done..\n", uint64_t(g_network->now())); + co_return result; +} + +Future kvSimpleClient() { + SimpleKeyValueStoreInterface server = co_await connect(); + std::cout << format("Set %s -> %s\n", "foo", "bar"); + SetRequest setRequest; + setRequest.key = "foo"; + setRequest.value = "bar"; + co_await server.set.getReply(setRequest); + GetRequest getRequest; + getRequest.key = "foo"; + std::string value = co_await server.get.getReply(getRequest); + std::cout << format("get(%s) -> %s\n", "foo", value.c_str()); +} + +Future kvClient(SimpleKeyValueStoreInterface server, std::shared_ptr ops) { + auto timeout = delay(20); + int rangeSize = 2 << 12; + loop { + SetRequest setRequest; + setRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize)); + setRequest.value = "foo"; + co_await server.set.getReply(setRequest); + ++(*ops); + try { + GetRequest getRequest; + getRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize)); + co_await server.get.getReply(getRequest); + ++(*ops); + } catch (Error& e) { + if (e.code() != error_code_io_error) { + throw e; + } + } + int from = deterministicRandom()->randomInt(0, rangeSize); + ClearRequest clearRequest; + clearRequest.from = std::to_string(from); + clearRequest.to = std::to_string(from + 100); + co_await server.clear.getReply(clearRequest); + ++(*ops); + if (timeout.isReady()) { + // we are done + co_return; + } + } +} + +Future throughputMeasurement(std::shared_ptr operations) { + loop { + co_await delay(1.0); + std::cout << format("%llu op/s\n", *operations); + *operations = 0; + } +} + +Future multipleClients() { + SimpleKeyValueStoreInterface server = co_await connect(); + auto ops = std::make_shared(0); + std::vector> clients(100); + for (auto& f : clients) { + f = kvClient(server, ops); + } + auto done = waitForAll(clients); + co_await (done || throughputMeasurement(ops)); + co_return; +} + +std::string clusterFile = "fdb.cluster"; + +Future logThroughput(int64_t* v, Key* next) { + loop { + int64_t last = *v; + co_await delay(1); + fmt::print("throughput: {} bytes/s, next: {}\n", *v - last, printable(*next).c_str()); + } +} + +Future fdbClientStream() { + Database db = Database::createDatabase(clusterFile, 300); + Transaction tx(db); + Key next; + int64_t bytes = 0; + Future logFuture = logThroughput(&bytes, &next); + loop { + Future onError; + PromiseStream> results; + try { + Future stream = tx.getRangeStream(results, + KeySelector(firstGreaterOrEqual(next), next.arena()), + KeySelector(firstGreaterOrEqual(normalKeys.end)), + GetRangeLimits()); + loop { + Standalone range = co_await results.getFuture(); + if (range.size()) { + bytes += range.expectedSize(); + next = keyAfter(range.back().key); + } + } + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + break; + } + onError = tx.onError(e); + } + co_await onError; + } +} + +bool transactionDone(std::convertible_to auto v) { + return v; +} + +bool transaction_done(void) { + return true; +} + +template +Future runTransactionWhile(DB const& db, Fun f) { + Transaction tr(db); + loop { + Future onError; + try { + if (transactionDone(co_await f(&tr))) { + co_return; + } + } catch (Error& e) { + onError = tr.onError(e); + } + co_await onError; + } +} + +template +Future runTransaction(DB const& db, Fun f) { + return runTransactionWhile(db, [&f](Transaction* tr) -> Future { + co_await f(tr); + co_return true; + }); +} + +template +Future runRYWTransaction(DB const& db, Fun f) { + Future onError; + ReadYourWritesTransaction tr(db); + loop { + if (onError.isValid()) { + co_await onError; + onError = Future(); + } + try { + co_await f(&tr); + co_return; + } catch (Error& e) { + onError = tr.onError(e); + } + } +} + +Future fdbClientGetRange() { + Database db = Database::createDatabase(clusterFile, 300); + Transaction tx(db); + Key next; + int64_t bytes = 0; + Future logFuture = logThroughput(&bytes, &next); + co_await runTransactionWhile(db, [&bytes, &next](Transaction* tr) -> Future { + RangeResult range = + co_await tr->getRange(KeySelector(firstGreaterOrEqual(next), next.arena()), + KeySelector(firstGreaterOrEqual(normalKeys.end)), + GetRangeLimits(GetRangeLimits::ROW_LIMIT_UNLIMITED, CLIENT_KNOBS->REPLY_BYTE_LIMIT)); + bytes += range.expectedSize(); + if (!range.more) { + co_return true; + } + next = keyAfter(range.back().key); + co_return false; + }); + co_return; +} + +Future fdbClient() { + co_await delay(30); + Database db = Database::createDatabase(clusterFile, 300); + std::string keyPrefix = "/tut/"; + Key startKey; + KeyRef endKey = "/tut0"_sr; + int beginIdx = 0; + loop { + co_await runTransaction(db, [&](Transaction* tr) -> Future { + // this workload is stupidly simple: + // 1. select a random key between 1 + // and 1e8 + // 2. select this key plus the 100 + // next ones + // 3. write 10 values in [k, k+100] + beginIdx = deterministicRandom()->randomInt(0, 1e8 - 100); + startKey = keyPrefix + std::to_string(beginIdx); + auto range = co_await tr->getRange(KeyRangeRef(startKey, endKey), 100); + for (int i = 0; i < 10; ++i) { + Key k = Key(keyPrefix + std::to_string(beginIdx + deterministicRandom()->randomInt(0, 100))); + tr->set(k, "foo"_sr); + } + co_await tr->commit(); + std::cout << "Committed\n"; + co_await delay(2.0); + co_return; + }); + } +} + +Future fdbStatusStresser() { + Database db = Database::createDatabase(clusterFile, 300); + Key statusJson(std::string("\xff\xff/status/json")); + loop { + co_await runRYWTransaction(db, [&statusJson](ReadYourWritesTransaction* tr) -> Future { + co_await tr->get(statusJson); + co_return; + }); + } +} + +AsyncGenerator> readBlocks(Reference file, int64_t blockSize) { + auto sz = co_await file->size(); + decltype(sz) offset = 0; + Arena arena; + auto block = new (arena) uint8_t[blockSize]; + while (offset < sz) { + auto read = co_await file->read(block, blockSize, offset); + offset += read; + co_yield StringRef(block, read); + } + while (true) { + co_yield Optional{}; + } +} + +AsyncGenerator> readLines(Reference file) { + auto blocks = readBlocks(file, 4 * 1024); + Arena arena; + StringRef lastLine; + loop { + auto optionalBlock = co_await blocks(); + if (!optionalBlock.present()) { + if (lastLine.empty()) { + co_yield Optional{}; + } else { + co_yield lastLine; + lastLine = {}; + arena = Arena(); + co_return; + } + } + StringRef block = optionalBlock.get(); + auto endsWithNewLine = block.back() == uint8_t('\n'); + while (!block.empty()) { + if (!lastLine.empty()) [[unlikely]] { + concatenateStrings(arena, lastLine, block.eatAny("\n"_sr, nullptr)); + if (!block.empty() || endsWithNewLine) { + co_yield lastLine; + lastLine = StringRef(); + arena = Arena(); + } + } else { + auto line = block.eatAny("\n"_sr, nullptr); + if (block.empty() && !endsWithNewLine) { + lastLine = StringRef(arena, line); + } else { + co_yield line; + } + } + } + } +} + +Future testReadLines() { + auto path = "/etc/hosts"s; + auto file = co_await IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_READWRITE, 0640); + auto lines = readLines(file); + for (int i = 0; true; ++i) { + auto line = co_await lines(); + if (!line.present()) { + break; + } + fmt::print("{}: {}\n", i, line.get()); + } +} + +// readLines -> Stream of lines of a text file + +std::unordered_map()>> actors = { + { "timer", &simpleTimer }, // ./tutorial timer + { "promiseDemo", &promiseDemo }, // ./tutorial promiseDemo + { "triggerDemo", &triggerDemo }, // ./tutorial triggerDemo + { "echoServer", &echoServer }, // ./tutorial -p 6666 echoServer + { "echoClient", &echoClient }, // ./tutorial -s 127.0.0.1:6666 echoClient + { "kvStoreServer", &kvStoreServer }, // ./tutorial -p 6666 kvStoreServer + { "kvSimpleClient", &kvSimpleClient }, // ./tutorial -s 127.0.0.1:6666 kvSimpleClient + { "multipleClients", &multipleClients }, // ./tutorial -s 127.0.0.1:6666 multipleClients + { "fdbClientStream", &fdbClientStream }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClientStream + { "fdbClientGetRange", &fdbClientGetRange }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClientGetRange + { "fdbClient", &fdbClient }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClient + { "fdbStatusStresser", &fdbStatusStresser }, + { "testReadLines", &testReadLines } +}; // ./tutorial -C $CLUSTER_FILE_PATH fdbStatusStresser + +int main(int argc, char* argv[]) { + bool isServer = false; + std::string port; + std::vector()>> toRun; + // parse arguments + for (int i = 1; i < argc; ++i) { + std::string arg(argv[i]); + if (arg == "-p") { + isServer = true; + if (i + 1 >= argc) { + std::cout << "Expecting an argument after -p\n"; + return 1; + } + port = std::string(argv[++i]); + continue; + } else if (arg == "-s") { + if (i + 1 >= argc) { + std::cout << "Expecting an argument after -s\n"; + return 1; + } + serverAddress = NetworkAddress::parse(argv[++i]); + continue; + } else if (arg == "-C") { + clusterFile = argv[++i]; + std::cout << "Using cluster file " << clusterFile << std::endl; + continue; + } + auto actor = actors.find(arg); + if (actor == actors.end()) { + std::cout << format("Error: actor %s does not exist\n", arg.c_str()); + return 1; + } + toRun.push_back(actor->second); + } + platformInit(); + g_network = newNet2(TLSConfig(), false, true); + FlowTransport::createInstance(!isServer, 0, WLTOKEN_COUNT_IN_TUTORIAL); + NetworkAddress publicAddress = NetworkAddress::parse("0.0.0.0:0"); + if (isServer) { + publicAddress = NetworkAddress::parse("0.0.0.0:" + port); + } + try { + if (isServer) { + auto listenError = FlowTransport::transport().bind(publicAddress, publicAddress); + if (listenError.isError()) { + listenError.get(); + } + } + } catch (Error& e) { + std::cout << format("Error while binding to address (%d): %s\n", e.code(), e.what()); + } + Net2FileSystem::newFileSystem(-10, ""); + // now we start the actors + std::vector> all; + all.reserve(toRun.size()); + for (auto& f : toRun) { + all.emplace_back(f()); + } + auto f = stopAfter(waitForAll(all)); + g_network->run(); + return 0; +} diff --git a/fdbrpc/CoroTests.cpp b/fdbrpc/CoroTests.cpp new file mode 100644 index 0000000000..ca037a5af3 --- /dev/null +++ b/fdbrpc/CoroTests.cpp @@ -0,0 +1,1914 @@ +/* + * CoroTests.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/UnitTest.h" +#include "flow/IAsyncFile.h" +#include "fdbrpc/fdbrpc.h" +#include "flow/TLSConfig.actor.h" + +#include +#include +#include +#include + +void forceLinkCoroTests() {} + +using namespace std::literals::string_literals; + +TEST_CASE("/flow/coro/buggifiedDelay") { + if (FLOW_KNOBS->MAX_BUGGIFIED_DELAY == 0) { + co_return; + } + loop { + double x = deterministicRandom()->random01(); + int last = 0; + Future f1 = map(delay(x), [last = &last](const Void&) { + *last = 1; + return Void(); + }); + Future f2 = map(delay(x), [last = &last](const Void&) { + *last = 2; + return Void(); + }); + co_await (f1 && f2); + if (last == 1) { + CODE_PROBE(true, "Delays can become ready out of order", probe::decoration::rare); + co_return; + } + } +} + +template +class LambdaCallback final : public CallbackType, public FastAllocated> { + Func func; + ErrFunc errFunc; + + void fire(T const& t) override { + CallbackType::remove(); + func(t); + delete this; + } + void fire(T&& t) override { + CallbackType::remove(); + func(std::move(t)); + delete this; + } + void error(Error e) override { + CallbackType::remove(); + errFunc(e); + delete this; + } + +public: + LambdaCallback(Func&& f, ErrFunc&& e) : func(std::move(f)), errFunc(std::move(e)) {} +}; + +template +void onReady(Future&& f, Func&& func, ErrFunc&& errFunc) { + if (f.isReady()) { + if (f.isError()) + errFunc(f.getError()); + else + func(f.get()); + } else + f.addCallbackAndClear(new LambdaCallback>(std::move(func), std::move(errFunc))); +} + +template +void onReady(FutureStream&& f, Func&& func, ErrFunc&& errFunc) { + if (f.isReady()) { + if (f.isError()) + errFunc(f.getError()); + else + func(f.pop()); + } else + f.addCallbackAndClear( + new LambdaCallback>(std::move(func), std::move(errFunc))); +} + +namespace { +void emptyVoidActor() {} + +Future emptyActor() { + return Void(); +} + +Future oneWaitVoidActor(Uncancellable, Future f) { + co_await f; + co_return; +} + +Future oneWaitActor(Future f) { + co_return co_await f; +} + +Future g_cheese; +Future cheeseWaitActor() { + co_await g_cheese; + co_return; +} + +void trivialVoidActor(int* result) { + *result = 1; +} + +Future return42Actor() { + return 42; +} + +Future voidWaitActor(Uncancellable, Future in, int* result) { + int i = co_await in; + *result = i; +} + +Future addOneActor(Future in) { + co_return (co_await in) + 1; +} + +Future chooseTwoActor(Future f, Future g) { + co_return co_await Choose().When(f, [](const Void&) {}).When(g, [](const Void&) {}).run(); +} + +Future consumeOneActor(FutureStream in) { + int i = co_await in; + co_return i; +} + +Future sumActor(FutureStream in) { + int total = 0; + try { + loop { + int i = co_await in; + total += i; + } + } catch (Error& e) { + if (e.code() != error_code_end_of_stream) + throw; + } + co_return total; +} + +template +Future templateActor(T t) { + return t; +} + +// bool expectActorCount(int x) { return actorCount == x; } +bool expectActorCount(int) { + return true; +} + +struct YieldMockNetwork final : INetwork, ReferenceCounted { + int ticks; + Promise nextTick; + int nextYield; + INetwork* baseNetwork; + + flowGlobalType global(int id) const override { return baseNetwork->global(id); } + void setGlobal(size_t id, flowGlobalType v) override { + baseNetwork->setGlobal(id, v); + return; + } + + YieldMockNetwork() : ticks(0), nextYield(0) { + baseNetwork = g_network; + g_network = this; + } + ~YieldMockNetwork() { g_network = baseNetwork; } + + void tick() { + ticks++; + Promise t; + t.swap(nextTick); + t.send(Void()); + } + + Future delay(double seconds, TaskPriority taskID) override { return nextTick.getFuture(); } + + Future orderedDelay(double seconds, TaskPriority taskID) override { return nextTick.getFuture(); } + + Future yield(TaskPriority taskID) override { + if (check_yield(taskID)) + return delay(0, taskID); + return Void(); + } + + bool check_yield(TaskPriority taskID) override { + if (nextYield > 0) + --nextYield; + return nextYield == 0; + } + + // Delegate everything else. TODO: Make a base class NetworkWrapper for delegating everything in INetwork + TaskPriority getCurrentTask() const override { return baseNetwork->getCurrentTask(); } + void setCurrentTask(TaskPriority taskID) override { baseNetwork->setCurrentTask(taskID); } + double now() const override { return baseNetwork->now(); } + double timer() override { return baseNetwork->timer(); } + double timer_monotonic() override { return baseNetwork->timer_monotonic(); } + void stop() override { return baseNetwork->stop(); } + void addStopCallback(std::function fn) override { + ASSERT(false); + return; + } + bool isSimulated() const override { return baseNetwork->isSimulated(); } + void onMainThread(Promise&& signal, TaskPriority taskID) override { + return baseNetwork->onMainThread(std::move(signal), taskID); + } + bool isOnMainThread() const override { return baseNetwork->isOnMainThread(); } + THREAD_HANDLE startThread(THREAD_FUNC_RETURN (*func)(void*), void* arg, int stackSize, const char* name) override { + return baseNetwork->startThread(func, arg, stackSize, name); + } + Future> open(std::string filename, int64_t flags, int64_t mode) { + return IAsyncFileSystem::filesystem()->open(filename, flags, mode); + } + Future deleteFile(std::string filename, bool mustBeDurable) { + return IAsyncFileSystem::filesystem()->deleteFile(filename, mustBeDurable); + } + void run() override { return baseNetwork->run(); } + bool checkRunnable() override { return baseNetwork->checkRunnable(); } + void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) override { + return baseNetwork->getDiskBytes(directory, free, total); + } + bool isAddressOnThisHost(NetworkAddress const& addr) const override { + return baseNetwork->isAddressOnThisHost(addr); + } + const TLSConfig& getTLSConfig() const override { + static TLSConfig emptyConfig; + return emptyConfig; + } +#ifdef ENABLE_SAMPLING + ActorLineageSet& getActorLineageSet() override { throw std::exception(); } +#endif + ProtocolVersion protocolVersion() const override { return baseNetwork->protocolVersion(); } + void _swiftEnqueue(void* task) override { baseNetwork->_swiftEnqueue(task); } +}; + +Future testCancelled(bool* exits, Future f) { + Error err = success(); + try { + co_await Future(Never()); + } catch (Error& e) { + err = e; + } + try { + co_await Future(Never()); + } catch (Error& e) { + *exits = true; + throw; + } + if (err.code() != error_code_success) { + throw err; + } +} +} // namespace + +TEST_CASE("/flow/coro/cancel1") { + bool exits = false; + Promise p; + Future test = testCancelled(&exits, p.getFuture()); + ASSERT(p.getPromiseReferenceCount() == 1 && p.getFutureReferenceCount() == 1); + test.cancel(); + ASSERT(exits); + ASSERT(test.getPromiseReferenceCount() == 0 && test.getFutureReferenceCount() == 1 && test.isReady() && + test.isError() && test.getError().code() == error_code_actor_cancelled); + ASSERT(p.getPromiseReferenceCount() == 1 && p.getFutureReferenceCount() == 0); + + return Void(); +} + +namespace { + +Future noteCancel(int* cancelled) { + *cancelled = 0; + try { + co_await Future(Never()); + throw internal_error(); + } catch (...) { + printf("Cancelled!\n"); + *cancelled = 1; + throw; + } +} + +} // namespace + +TEST_CASE("/flow/coro/cancel2") { + int c1 = 0, c2 = 0, c3 = 0; + + Future cf = noteCancel(&c1); + ASSERT(c1 == 0); + cf = Future(); + ASSERT(c1 == 1); + + cf = noteCancel(&c2) && noteCancel(&c3); + ASSERT(c2 == 0 && c3 == 0); + cf = Future(); + ASSERT(c2 == 1 && c3 == 1); + return Void(); +} + +TEST_CASE("/flow/coro/trivial_actors") { + ASSERT(expectActorCount(0)); + + int result = 0; + trivialVoidActor(&result); + ASSERT(result == 1); + + Future f = return42Actor(); + ASSERT(f.isReady() && !f.isError() && f.get() == 42 && f.getFutureReferenceCount() == 1 && + f.getPromiseReferenceCount() == 0); + f = Future(); + + f = templateActor(24); + ASSERT(f.isReady() && !f.isError() && f.get() == 24 && f.getFutureReferenceCount() == 1 && + f.getPromiseReferenceCount() == 0); + f = Future(); + + result = 0; + voidWaitActor(Uncancellable(), 2, &result); + ASSERT(result == 2 && expectActorCount(0)); + + Promise p; + f = addOneActor(p.getFuture()); + ASSERT(!f.isReady() && expectActorCount(1)); + p.send(100); + ASSERT(f.isReady() && f.get() == 101); + f = Future(); + + PromiseStream ps; + f = consumeOneActor(ps.getFuture()); + ASSERT(!f.isReady() && expectActorCount(1)); + ps.send(101); + ASSERT(f.get() == 101 && ps.isEmpty()); + ps.send(102); + ASSERT(!ps.isEmpty()); + f = consumeOneActor(ps.getFuture()); + ASSERT(f.get() == 102 && ps.isEmpty()); + + f = sumActor(ps.getFuture()); + ps.send(1); + ps.send(10); + ps.send(100); + ps.sendError(end_of_stream()); + ASSERT(f.get() == 111); + + return Void(); +} + +TEST_CASE("/flow/coro/yieldedFuture/progress") { + // Check that if check_yield always returns true, the yieldedFuture will do nothing immediately but will + // get one thing done per "tick" (per delay(0) returning). + + auto yn = makeReference(); + + yn->nextYield = 0; + + Promise p; + Future u = p.getFuture(); + Future i = success(u); + + std::vector> v; + for (int j = 0; j < 5; j++) + v.push_back(yieldedFuture(u)); + auto numReady = [&v]() { return std::count_if(v.begin(), v.end(), [](Future v) { return v.isReady(); }); }; + + ASSERT(numReady() == 0); + p.send(Void()); + ASSERT(u.isReady() && i.isReady() && numReady() == 0); + + for (int j = 0; j < 5; j++) { + yn->tick(); + ASSERT(numReady() == j + 1); + } + + for (int j = 0; j < 5; j++) { + ASSERT(v[j].getPromiseReferenceCount() == 0 && v[j].getFutureReferenceCount() == 1); + } + + return Void(); +} + +TEST_CASE("/flow/coro/yieldedFuture/random") { + // Check expectations about exactly how yieldedFuture responds to check_yield results + + auto yn = makeReference(); + + for (int r = 0; r < 100; r++) { + Promise p; + Future u = p.getFuture(); + Future i = success(u); + + std::vector> v; + for (int j = 0; j < 25; j++) + v.push_back(yieldedFuture(u)); + auto numReady = [&v]() { + return std::count_if(v.begin(), v.end(), [](Future v) { return v.isReady(); }); + }; + + Future j = success(u); + + ASSERT(numReady() == 0); + + int expectYield = deterministicRandom()->randomInt(0, 4); + int expectReady = expectYield; + yn->nextYield = 1 + expectYield; + + p.send(Void()); + ASSERT(u.isReady() && i.isReady() && j.isReady() && numReady() == expectReady); + + while (numReady() != v.size()) { + expectYield = deterministicRandom()->randomInt(0, 4); + yn->nextYield = 1 + expectYield; + expectReady += 1 + expectYield; + yn->tick(); + // printf("Yielding %d times, expect %d/%d ready, got %d\n", expectYield, expectReady, v.size(), numReady() + // ); + ASSERT(numReady() == std::min(expectReady, v.size())); + } + + for (int k = 0; k < v.size(); k++) { + ASSERT(v[k].getPromiseReferenceCount() == 0 && v[k].getFutureReferenceCount() == 1); + } + } + + return Void(); +} + +TEST_CASE("/flow/coro/perf/yieldedFuture") { + double start; + int N = 1000000; + + auto yn = makeReference(); + + yn->nextYield = 2 * N + 100; + + Promise p; + Future f = p.getFuture(); + std::vector> ys; + + start = timer(); + for (int i = 0; i < N; i++) + ys.push_back(yieldedFuture(f)); + printf("yieldedFuture(f) create: %0.1f M/sec\n", N / 1e6 / (timer() - start)); + p.send(Void()); + printf("yieldedFuture(f) total: %0.1f M/sec\n", N / 1e6 / (timer() - start)); + + for (auto& y : ys) + ASSERT(y.isReady()); + + p = Promise(); + f = p.getFuture(); + + start = timer(); + for (int i = 0; i < N; i++) { + // We're only measuring how long it takes to cancel things, so no need to run any of them + (void)yieldedFuture(f); + } + printf("yieldedFuture(f) cancel: %0.1f M/sec\n", N / 1e6 / (timer() - start)); + + return Void(); +} + +TEST_CASE("/flow/coro/chooseTwoActor") { + ASSERT(expectActorCount(0)); + + Promise a, b; + Future c = chooseTwoActor(a.getFuture(), b.getFuture()); + ASSERT(a.getFutureReferenceCount() == 2 && b.getFutureReferenceCount() == 2 && !c.isReady()); + b.send(Void()); + ASSERT(a.getFutureReferenceCount() == 0 && b.getFutureReferenceCount() == 0 && c.isReady() && !c.isError() && + expectActorCount(1)); + c = Future(); + ASSERT(a.getFutureReferenceCount() == 0 && b.getFutureReferenceCount() == 0 && expectActorCount(0)); + return Void(); +} + +TEST_CASE("#flow/coro/perf/actor patterns") { + double start; + int N = 1000000; + + ASSERT(expectActorCount(0)); + + start = timer(); + for (int i = 0; i < N; i++) + emptyVoidActor(); + printf("emptyVoidActor(): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + + ASSERT(expectActorCount(0)); + + start = timer(); + for (int i = 0; i < N; i++) { + emptyActor(); + } + printf("emptyActor(): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + + ASSERT(expectActorCount(0)); + + Promise neverSet; + Future never = neverSet.getFuture(); + Future already = Void(); + + start = timer(); + for (int i = 0; i < N; i++) + oneWaitVoidActor(Uncancellable(), already); + printf("oneWaitVoidActor(already): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + + ASSERT(expectActorCount(0)); + + /*start = timer(); + for (int i = 0; i < N; i++) + oneWaitVoidActor(never); + printf("oneWaitVoidActor(never): %0.1f M/sec\n", N / 1e6 / (timer() - start));*/ + + { + start = timer(); + for (int i = 0; i < N; i++) { + Future f = oneWaitActor(already); + ASSERT(f.isReady()); + } + printf("oneWaitActor(already): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Future f = oneWaitActor(never); + ASSERT(!f.isReady()); + } + printf("(cancelled) oneWaitActor(never): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + ASSERT(expectActorCount(0)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Promise p; + Future f = oneWaitActor(p.getFuture()); + p.send(Void()); + ASSERT(f.isReady()); + } + printf("oneWaitActor(after): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out(N); + start = timer(); + for (int i = 0; i < N; i++) { + out[i] = oneWaitActor(pipe[i].getFuture()); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out[i].isReady()); + } + printf("oneWaitActor(fifo): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out(N); + start = timer(); + for (int i = 0; i < N; i++) { + out[i] = oneWaitActor(pipe[i].getFuture()); + } + for (int i = N - 1; i >= 0; i--) { + pipe[i].send(Void()); + ASSERT(out[i].isReady()); + } + printf("oneWaitActor(lifo): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Future f = chooseTwoActor(already, already); + ASSERT(f.isReady()); + } + printf("chooseTwoActor(already, already): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Future f = chooseTwoActor(already, never); + ASSERT(f.isReady()); + } + printf("chooseTwoActor(already, never): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Future f = chooseTwoActor(never, already); + ASSERT(f.isReady()); + } + printf("chooseTwoActor(never, already): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Future f = chooseTwoActor(never, never); + ASSERT(!f.isReady()); + } + // ASSERT(expectActorCount(0)); + printf("(cancelled) chooseTwoActor(never, never): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Promise p; + Future f = chooseTwoActor(p.getFuture(), never); + p.send(Void()); + ASSERT(f.isReady()); + } + printf("chooseTwoActor(after, never): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out(N); + start = timer(); + for (int i = 0; i < N; i++) { + out[i] = chooseTwoActor(pipe[i].getFuture(), never); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out[i].isReady()); + } + printf("chooseTwoActor(fifo, never): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out(N); + start = timer(); + for (int i = 0; i < N; i++) { + out[i] = chooseTwoActor(pipe[i].getFuture(), pipe[i].getFuture()); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out[i].isReady()); + } + printf("chooseTwoActor(fifo, fifo): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out(N); + start = timer(); + for (int i = 0; i < N; i++) { + out[i] = chooseTwoActor(chooseTwoActor(pipe[i].getFuture(), never), never); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out[i].isReady()); + } + printf("chooseTwoActor^2((fifo, never), never): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Promise p; + Future f = oneWaitActor(chooseTwoActor(p.getFuture(), never)); + p.send(Void()); + ASSERT(f.isReady()); + } + printf("oneWaitActor(chooseTwoActor(after, never)): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out(N); + start = timer(); + for (int i = 0; i < N; i++) { + out[i] = oneWaitActor(chooseTwoActor(pipe[i].getFuture(), never)); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out[i].isReady()); + } + printf("oneWaitActor(chooseTwoActor(fifo, never)): %0.1f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + for (int i = 0; i < N; i++) { + Promise p; + Future f = chooseTwoActor(p.getFuture(), never); + Future a = oneWaitActor(f); + Future b = oneWaitActor(f); + p.send(Void()); + ASSERT(f.isReady()); + } + printf("2xoneWaitActor(chooseTwoActor(after, never)): %0.2f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out1(N); + std::vector> out2(N); + start = timer(); + for (int i = 0; i < N; i++) { + Future f = chooseTwoActor(pipe[i].getFuture(), never); + out1[i] = oneWaitActor(f); + out2[i] = oneWaitActor(f); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out2[i].isReady()); + } + printf("2xoneWaitActor(chooseTwoActor(fifo, never)): %0.2f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out1(N); + std::vector> out2(N); + start = timer(); + for (int i = 0; i < N; i++) { + Future f = chooseTwoActor(oneWaitActor(pipe[i].getFuture()), never); + out1[i] = oneWaitActor(f); + out2[i] = oneWaitActor(f); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out2[i].isReady()); + } + printf("2xoneWaitActor(chooseTwoActor(oneWaitActor(fifo), never)): %0.2f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + std::vector> pipe(N); + std::vector> out1(N); + std::vector> out2(N); + start = timer(); + for (int i = 0; i < N; i++) { + g_cheese = pipe[i].getFuture(); + Future f = chooseTwoActor(cheeseWaitActor(), never); + g_cheese = f; + out1[i] = cheeseWaitActor(); + out2[i] = cheeseWaitActor(); + } + for (int i = 0; i < N; i++) { + pipe[i].send(Void()); + ASSERT(out2[i].isReady()); + } + printf("2xcheeseActor(chooseTwoActor(cheeseActor(fifo), never)): %0.2f M/sec\n", N / 1e6 / (timer() - start)); + // printf("sizeof(CheeseWaitActorActor) == %zu\n", cheeseWaitActorSize()); + } + + { + PromiseStream data; + start = timer(); + Future sum = sumActor(data.getFuture()); + for (int i = 0; i < N; i++) + data.send(1); + data.sendError(end_of_stream()); + ASSERT(sum.get() == N); + printf("sumActor: %0.2f M/sec\n", N / 1e6 / (timer() - start)); + } + + { + start = timer(); + std::vector> ps(3); + std::vector> fs(3); + + for (int i = 0; i < N; i++) { + ps.clear(); + ps.resize(3); + for (int j = 0; j < ps.size(); j++) + fs[j] = ps[j].getFuture(); + + Future q = quorum(fs, 2); + for (auto& p : ps) + p.send(Void()); + } + printf("quorum(2/3): %0.2f M/sec\n", N / 1e6 / (timer() - start)); + } + + return Void(); +} + +namespace { + +template +struct YAMRandom { + YAM yam; + std::vector> onchanges; + int kmax; + + YAMRandom() : kmax(3) {} + + void randomOp() { + if (deterministicRandom()->random01() < 0.01) + while (!check_yield()) + ; + + int k = deterministicRandom()->randomInt(0, kmax); + int op = deterministicRandom()->randomInt(0, 7); + // printf("%d",op); + if (op == 0) { + onchanges.push_back(yam.onChange(k)); + } else if (op == 1) { + onchanges.push_back(trigger([this]() { this->randomOp(); }, yam.onChange(k))); + } else if (op == 2) { + if (onchanges.size()) { + int i = deterministicRandom()->randomInt(0, onchanges.size()); + onchanges[i] = onchanges.back(); + onchanges.pop_back(); + } + } else if (op == 3) { + onchanges.clear(); + } else if (op == 4) { + int v = deterministicRandom()->randomInt(0, 3); + yam.set(k, v); + } else if (op == 5) { + yam.trigger(k); + } else if (op == 6) { + int a = deterministicRandom()->randomInt(0, kmax); + int b = deterministicRandom()->randomInt(0, kmax); + yam.triggerRange(std::min(a, b), std::max(a, b) + 1); + } + } +}; + +} // namespace + +TEST_CASE("/flow/coro/long_loop") { + uint64_t res = 0; + uint64_t n = 100'000; + for (decltype(n) i = 0; i < n; ++i) { + if (i < 99'999) { + // prevent compiler optimizations by doing something "useful" in the loop + res += i + 1; + continue; + } + co_await delay(0.1); + } + ASSERT(res == n * (n - 1) / 2); +} + +TEST_CASE("/flow/coro/YieldedAsyncMap/randomized") { + YAMRandom> yamr; + int it; + for (it = 0; it < 100000; it++) { + yamr.randomOp(); + co_await yield(); + if (it % 100 == 0) { + fmt::print("/flow/coro/YieldedAsyncMap/randomized iteration {}\n", it); + } + } +} + +TEST_CASE("/flow/coro/AsyncMap/randomized") { + YAMRandom> yamr; + for (int it = 0; it < 100000; it++) { + yamr.randomOp(); + co_await yield(); + } +} + +TEST_CASE("/flow/coro/YieldedAsyncMap/basic") { + YieldedAsyncMap yam; + Future y0 = yam.onChange(1); + yam.setUnconditional(1, 0); + Future y1 = yam.onChange(1); + Future y1a = yam.onChange(1); + Future y1b = yam.onChange(1); + yam.set(1, 1); + // while (!check_yield()) {} + // yam.triggerRange(0, 4); + + Future y2 = yam.onChange(1); + co_await reportErrors(y0, "Y0"); + co_await reportErrors(y1, "Y1"); + co_await reportErrors(y1a, "Y1a"); + co_await reportErrors(y1b, "Y1b"); + co_await reportErrors(timeout(y2, 5, Void()), "Y2"); +} + +TEST_CASE("/flow/coro/YieldedAsyncMap/cancel") { + YieldedAsyncMap yam; + // ASSERT(yam.count(1) == 0); + // state Future y0 = yam.onChange(1); + // ASSERT(yam.count(1) == 1); + // yam.setUnconditional(1, 0); + + ASSERT(yam.count(1) == 0); + Future y1 = yam.onChange(1); + Future y1a = yam.onChange(1); + Future y1b = yam.onChange(1); + ASSERT(yam.count(1) == 1); + y1.cancel(); + ASSERT(!y1a.isReady()); + y1a.cancel(); + ASSERT(!y1b.isReady()); + ASSERT(yam.count(1) == 1); + y1b.cancel(); + ASSERT(y1b.getError().code() == error_code_actor_cancelled); + ASSERT(yam.count(1) == 0); + + return Void(); +} + +TEST_CASE("/flow/coro/YieldedAsyncMap/cancel2") { + YieldedAsyncMap yam; + + Future y1 = yam.onChange(1); + Future y2 = yam.onChange(2); + + auto* pyam = &yam; + uncancellable(trigger( + [pyam]() { + printf("Triggered\n"); + pyam->triggerAll(); + }, + delay(1))); + + co_await y1; + printf("Got y1\n"); + y2.cancel(); +} + +TEST_CASE("/flow/coro/AsyncVar/basic") { + AsyncVar av; + Future ch = av.onChange(); + ASSERT(!ch.isReady()); + av.set(5); + ASSERT(ch.isReady()); + ASSERT(av.get() == 5); + + ch = av.onChange(); + ASSERT(!ch.isReady()); + av.set(6); + ASSERT(ch.isReady()); + ASSERT(av.get() == 6); + + return Void(); +} + +namespace { + +Future waitAfterCancel(int* output) { + *output = 0; + try { + co_await Future(Never()); + } catch (...) { + } + co_await ((*output = 1, Future(Void()))); + ASSERT(false); +} + +} // namespace + +TEST_CASE("/fdbrpc/flow/wait_expression_after_cancel") { + int a = -1; + Future f = waitAfterCancel(&a); + ASSERT(a == 0); + f.cancel(); + ASSERT(a == 1); + return Void(); +} + +// Tests for https://github.com/apple/foundationdb/issues/1226 + +namespace { + +template +struct ShouldNotGoIntoClassContextStack; + +class Foo1 { +public: + explicit Foo1(int x) : x(x) {} + Future foo(); + +private: + int x; +}; + +Future Foo1::foo() { + co_await Future(); + co_return x; +} + +class [[nodiscard]] Foo2 { +public: + explicit Foo2(int x) : x(x) {} + Future foo(); + +private: + int x; +}; + +Future Foo2::foo() { + co_await Future(); + co_return x; +} + +class alignas(4) Foo3 { +public: + explicit Foo3(int x) : x(x) {} + Future foo(); + +private: + int x; +}; + +Future Foo3::foo() { + co_await Future(); + co_return x; +} + +struct Super {}; + +class Foo4 : Super { +public: + explicit Foo4(int x) : x(x) {} + Future foo(); + +private: + int x; +}; + +Future Foo4::foo() { + co_await Future(); + co_return x; +} + +struct Outer { + class Foo5 : Super { + public: + explicit Foo5(int x) : x(x) {} + Future foo(); + + private: + int x; + }; +}; + +Future Outer::Foo5::foo() { + co_await Future(); + co_return x; +} + +struct Tracker { + int copied; + bool moved; + Tracker(int copied = 0) : copied(copied), moved(false) {} + Tracker(Tracker&& other) : Tracker(other.copied) { + ASSERT(!other.moved); + other.moved = true; + } + Tracker& operator=(Tracker&& other) { + ASSERT(!other.moved); + other.moved = true; + this->moved = false; + this->copied = other.copied; + return *this; + } + Tracker(const Tracker& other) : Tracker(other.copied + 1) { ASSERT(!other.moved); } + Tracker& operator=(const Tracker& other) { + ASSERT(!other.moved); + this->moved = false; + this->copied = other.copied + 1; + return *this; + } + ~Tracker() = default; + + static Future listen(FutureStream stream) { + Tracker movedTracker = co_await stream; + ASSERT(!movedTracker.moved); + ASSERT(movedTracker.copied == 0); + } +}; + +} // namespace + +TEST_CASE("/flow/coro/PromiseStream/move") { + PromiseStream stream; + Future listener; + { + // This tests the case when a callback is added before + // a movable value is sent + listener = Tracker::listen(stream.getFuture()); + stream.send(Tracker{}); + co_await listener; + } + + { + // This tests the case when a callback is added before + // a unmovable value is sent + listener = Tracker::listen(stream.getFuture()); + Tracker namedTracker; + stream.send(std::move(namedTracker)); + co_await listener; + } + { + // This tests the case when no callback is added until + // after a movable value is sent + stream.send(Tracker{}); + stream.send(Tracker{}); + { + const Tracker& movedTracker = co_await stream.getFuture(); + ASSERT(!movedTracker.moved); + ASSERT(movedTracker.copied == 0); + } + { + const Tracker& movedTracker = co_await stream.getFuture(); + ASSERT(!movedTracker.moved); + ASSERT(movedTracker.copied == 0); + } + } + { + // This tests the case when no callback is added until + // after an unmovable value is sent + Tracker namedTracker1; + Tracker namedTracker2; + stream.send(namedTracker1); + stream.send(namedTracker2); + { + const Tracker& copiedTracker = co_await stream.getFuture(); + ASSERT(!copiedTracker.moved); + // must copy onto queue + ASSERT(copiedTracker.copied == 1); + } + { + const Tracker& copiedTracker = co_await stream.getFuture(); + ASSERT(!copiedTracker.moved); + // must copy onto queue + ASSERT(copiedTracker.copied == 1); + } + } +} + +TEST_CASE("/flow/coro/PromiseStream/move2") { + PromiseStream stream; + stream.send(Tracker{}); + const Tracker& tracker = co_await stream.getFuture(); + Tracker movedTracker = std::move(const_cast(tracker)); + ASSERT(tracker.moved); + ASSERT(!movedTracker.moved); + ASSERT(movedTracker.copied == 0); +} + +namespace { + +constexpr double mutexTestDelay = 0.00001; + +Future mutexTest(int id, FlowMutex* mutex, int n, bool allowError, bool* verbose) { + while (n-- > 0) { + double d = deterministicRandom()->random01() * mutexTestDelay; + if (*verbose) { + printf("%d:%d wait %f while unlocked\n", id, n, d); + } + co_await delay(d); + + if (*verbose) { + printf("%d:%d locking\n", id, n); + } + FlowMutex::Lock lock = co_await mutex->take(); + if (*verbose) { + printf("%d:%d locked\n", id, n); + } + + d = deterministicRandom()->random01() * mutexTestDelay; + if (*verbose) { + printf("%d:%d wait %f while locked\n", id, n, d); + } + co_await delay(d); + + // On the last iteration, send an error or drop the lock if allowError is true + if (n == 0 && allowError) { + if (deterministicRandom()->coinflip()) { + // Send explicit error + if (*verbose) { + printf("%d:%d sending error\n", id, n); + } + lock.error(end_of_stream()); + } else { + // Do nothing + if (*verbose) { + printf("%d:%d dropping promise, returning without unlock\n", id, n); + } + } + } else { + if (*verbose) { + printf("%d:%d unlocking\n", id, n); + } + lock.release(); + } + } + + if (*verbose) { + printf("%d Returning\n", id); + } +} + +} // namespace + +TEST_CASE("/flow/coro/FlowMutex") { + int count = 100000; + + // Default verboseness + bool verboseSetting = false; + // Useful for debugging, enable verbose mode for this iteration number + int verboseTestIteration = -1; + + try { + bool verbose = verboseSetting || count == verboseTestIteration; + + while (--count > 0) { + if (count % 1000 == 0) { + printf("%d tests left\n", count); + } + + FlowMutex mutex; + std::vector> tests; + + bool allowErrors = deterministicRandom()->coinflip(); + if (verbose) { + printf("\nTesting allowErrors=%d\n", allowErrors); + } + + Optional error; + + try { + for (int i = 0; i < 10; ++i) { + tests.push_back(mutexTest(i, &mutex, 10, allowErrors, &verbose)); + } + co_await waitForAll(tests); + + if (allowErrors) { + if (verbose) { + printf("Final wait in case error was injected by the last actor to finish\n"); + } + co_await mutex.take(); + } + } catch (Error& e) { + if (verbose) { + printf("Caught error %s\n", e.what()); + } + error = e; + } + if (error.present()) { + // Some actors can still be running, waiting while locked or unlocked, + // but all should become ready, some with errors. + if (verbose) { + printf("Waiting for completions. Future end states:\n"); + } + for (int i = 0; i < tests.size(); ++i) { + ErrorOr f = co_await errorOr(tests[i]); + if (verbose) { + printf(" %d: %s\n", i, f.isError() ? f.getError().what() : "done"); + } + } + } + + // If an error was caused, one should have been detected. + // Otherwise, no errors should be detected. + ASSERT(error.present() == allowErrors); + } + } catch (Error& e) { + printf("Error at count=%d\n", count + 1); + ASSERT(false); + } +} + +namespace { + +struct LifetimeLogger { + LifetimeLogger(std::ostream& ss, int id) : ss(ss), id(id) { + ss << "LifetimeLogger(" << id << "). "; + std::cout << "LifetimeLogger(" << id << ").\n"; + } + ~LifetimeLogger() { + ss << "~LifetimeLogger(" << id << "). "; + std::cout << "~LifetimeLogger(" << id << ").\n"; + } + + std::ostream& ss; + int id; +}; + +template +Future simple_await_test(std::stringstream& ss, Future f) { + ss << "start. "; + LifetimeLogger ll(ss, 0); + + co_await (f); + ss << "wait returned. "; + + LifetimeLogger ll2(ss, 1); + co_return; + ss << "after co_return. "; +} + +Future actor_cancel_test(std::stringstream& ss) { + ss << "start. "; + + LifetimeLogger ll(ss, 0); + + try { + co_await delay(100); + } catch (Error& e) { + ss << "error: " << e.what() << ". "; + } + + std::string foo = "foo"; + + co_await delay(1.0); + + ss << "wait returned. "; + + // foo is alive here! + + co_return; + ss << "after co_return. "; +} + +Future actor_throw_test(std::stringstream& ss) { + ss << "start. "; + + LifetimeLogger ll(ss, 0); + + co_await delay(0); + + throw io_error(); + + ss << "after throw. "; + co_return; + ss << "after co_return. "; +} + +Future simpleWaitTestCoro() { + std::cout << "simple_wait_test coro\n"; + std::cout << "=====================\n"; + std::stringstream ss; + try { + fmt::print("before wait\n"); + co_await simple_await_test(ss, Future(io_error())); + } catch (Error& e) { + fmt::print("error\n"); + ss << "error: " << e.what() << ". "; + } + std::cout << ss.str() << std::endl << std::endl; +}; + +template +Future tagAndForwardCoro(Uncancellable, Promise* pOutputPromise, U value, Future signal) { + Promise out(std::move(*pOutputPromise)); + co_await signal; + out.send(std::move(value)); +} + +// this is just there to make sure the stack is properly cleaned up during this test. However, in real code +// this indirection wouldn't be necessary +template +void tagAndForwardWrapper(Promise* pOutputPromise, U value, Future signal) { + tagAndForwardCoro(Uncancellable(), pOutputPromise, std::move(value), std::move(signal)); +} + +Future testUncancellable() { + Promise p; + Future fut = p.getFuture(); + tagAndForwardWrapper(&p, 2, delay(0.2)); + ASSERT(co_await fut == 2); +} + +Future delayAndReturn(Uncancellable) { + co_await delay(1.0); + co_return 5; +} + +Future testUncancellable2() { + try { + auto f = delayAndReturn(Uncancellable()); + co_await delay(0.1); + f.cancel(); + ASSERT(co_await f == 5); + } catch (Error& e) { + ASSERT(e.code() != error_code_actor_cancelled); + } +} + +Future sendToRandomPromise(Promise voidPromise, + Promise intPromise, + Promise doublePromise, + PromiseStream voidStream, + PromiseStream intStream) { + co_await delay(0.1); + int branch = deterministicRandom()->randomInt(0, 5); + switch (branch) { + case 0: + voidPromise.send(Void()); + break; + case 1: + intPromise.send(7); + break; + case 2: + doublePromise.send(8.0); + break; + case 3: + voidStream.send(Void()); + break; + case 4: + intStream.send(13); + break; + default: + ASSERT(false); + } + co_return branch; +} + +Future testChooseWhen() { + co_await Choose() + .When(delay(1), [](Void const&) { ASSERT(false); }) + .When(Future(Void()), [](Void const&) { /*success*/ }) + .When( + []() { + ASSERT(false); + return Future(Void()); + }, + [](Void const&) { ASSERT(false); }) + .run(); + try { + co_await Choose() + .When(Future(Never()), [](Void const&) { ASSERT(false); }) + .When([]() -> Future { throw io_error(); }, [](Void const&) { ASSERT(false); }) + .run(); + } catch (Error const& e) { + ASSERT(e.code() == error_code_io_error); + } + Promise voidPromise; + Promise intPromise; + Promise doublePromise; + PromiseStream voidStream; + PromiseStream intStream; + int chosenBranch = -1; + Future chooseAfter = sendToRandomPromise(voidPromise, intPromise, doublePromise, voidStream, intStream); + co_await Choose() + .When(voidPromise.getFuture(), + [&chosenBranch](Void const&) { + fmt::print("Chose Branch {}\n", 0); + chosenBranch = 0; + }) + .When(intPromise.getFuture(), + [&chosenBranch](int const& i) { + fmt::print("Chose Branch {}\n", 1); + chosenBranch = 1; + ASSERT(i == 7); + }) + .When(doublePromise.getFuture(), + [&chosenBranch](double const& d) { + fmt::print("Chose Branch {}\n", 2); + chosenBranch = 2; + ASSERT(d == 8.0); + }) + .When(voidStream.getFuture(), + [&chosenBranch](Void const&) { + fmt::print("Chose Branch {}\n", 3); + chosenBranch = 3; + }) + .When(intStream.getFuture(), + [&chosenBranch](int const& i) { + fmt::print("Chose Branch {}\n", 4); + chosenBranch = 4; + ASSERT(i == 13); + }) + .run(); + ASSERT(chosenBranch == co_await chooseAfter); +} + +Future delaySequence(PromiseStream p, std::vector* nums) { + for (auto n : *nums) { + co_await delay(n); + p.send(n); + } +} + +Future futureStreamTest() { + std::vector rnds; + rnds.reserve(100); + for (int i = 0; i < 100; ++i) { + rnds.emplace_back(deterministicRandom()->random01() * 0.1); + } + PromiseStream promise; + auto fut = promise.getFuture(); + Future f = delaySequence(std::move(promise), &rnds); + int i = 0; + while (!f.isReady() || fut.isReady()) { + try { + ASSERT(co_await fut == rnds[i++]); + } catch (Error& e) { + if (e.code() == error_code_broken_promise) { + break; + } + throw; + } + } + ASSERT(i == 100); +} + +template +Future verifyAddress(T* ptr, intptr_t address) { + co_await delay(0.1); + ASSERT(reinterpret_cast(ptr) == address); +} + +Future stackMemoryTest() { + int a = deterministicRandom()->randomInt(0, 1000); + auto aPtr = reinterpret_cast(&a); + co_await verifyAddress(&a, aPtr); + ASSERT(aPtr == reinterpret_cast(&a)); +} + +AsyncGenerator simpleGeneratorTest() { + for (int i = 0; true; ++i) { + co_await delay(0.01); + co_yield i; + } +} + +Future testSimpleGenerator() { + auto gen = simpleGeneratorTest(); + for (int i = 0; i < 100; ++i) { + ASSERT(gen); + ASSERT(co_await gen() == i); + } +} + +AsyncGenerator readBlocks(Reference file, size_t blockSize) { + auto size = size_t(co_await file->size()); + decltype(size) offset = 0; + Arena arena; + auto buffer = new (arena) uint8_t[blockSize]; + while (size - offset > 0) { + auto read = co_await file->read(buffer, int(std::min(size - offset, blockSize)), offset); + offset += read; + co_yield StringRef(buffer, read); + } +} + +AsyncGenerator readLines(Reference file) { + constexpr size_t blockSize = 4096; + auto gen = readBlocks(file, blockSize); + Arena arena; + StringRef line; + while (gen) { + StringRef block; + try { + block = co_await gen(); + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + break; + } + throw; + } + bool lastIsNewline = char(block[block.size() - 1]) == '\n'; + ASSERT(!block.empty()); + if (!line.empty()) { + // we have to carry over data from a previous block + auto firstLine = block.eatAny("\n"_sr, nullptr); + line = concatenateStrings(arena, line, firstLine); + if (!block.empty() || lastIsNewline) { + // we read at least one block + co_yield line; + line = StringRef(); + arena = Arena(); + } + } + ASSERT(line.empty() || block.empty()); + while (!block.empty()) { + line = block.eatAny("\n"_sr, nullptr); + if (!block.empty()) [[likely]] { + co_yield line; + } else { + if (lastIsNewline) { + co_yield line; + line = StringRef(); + arena = Arena(); + } else { + // this line might not be complete, so we need to potentially concatenate it with + // the beginning of the next block + line = StringRef(arena, line); + } + } + } + } + if (!line.empty()) { + co_yield line; + } +} + +AsyncGenerator lineGenerator(size_t minLen, + size_t maxLen, + size_t blockSize, + Standalone>* lines) { + size_t remainingLine = 0; + bool firstBlock = true; + bool startedLine = false; + loop { + Arena arena; + auto block = new (arena) uint8_t[blockSize]; + size_t offset = 0; + while (offset < blockSize) { + size_t lineLength = + remainingLine == 0 ? deterministicRandom()->randomSkewedUInt32(minLen, maxLen) : remainingLine; + auto toWrite = std::min(blockSize - offset, lineLength); + remainingLine = lineLength - toWrite; + for (size_t i = 0; i < toWrite; ++i) { + block[offset + i] = uint8_t(deterministicRandom()->randomAlphaNumeric()); + } + StringRef currLine; + if (remainingLine == 0) { + block[offset + toWrite - 1] = uint8_t('\n'); + currLine = StringRef(block + offset, int(toWrite - 1)); + } else { + currLine = StringRef(block + offset, int(toWrite)); + } + if (startedLine) { + lines->back() = concatenateStrings(lines->arena(), lines->back(), currLine); + } else { + lines->push_back_deep(lines->arena(), currLine); + } + startedLine = remainingLine != 0; + offset += toWrite; + } + if (firstBlock) { + firstBlock = false; + } + co_yield StringRef(block, blockSize); + } +} + +Future>> writeTestFile(Reference file) { + size_t length = (1 << 20); // 1MB + constexpr size_t minLineLength = 10; + constexpr size_t maxLineLength = 1024; + constexpr size_t blockSize = 4096; + Standalone> result; + auto gen = lineGenerator(minLineLength, maxLineLength, blockSize, &result); + size_t offset = 0; + while (offset < length) { + auto str = co_await gen(); + for (int i = 0; i < str.size(); ++i) { + if (str[i] == '\0') { + fmt::print("Attempted to write 0-byte at block-offset {}\n", i); + ASSERT(false); + } + } + co_await file->write(str.begin(), str.size(), size_t(offset)); + offset += str.size(); + } + co_await file->flush(); + co_return result; +} + +std::string rndFileName() { + std::string rndString; + rndString.reserve(16); + for (int i = 0; i < 16; ++i) { + rndString.push_back(deterministicRandom()->randomAlphaNumeric()); + } + return fmt::format("cppcoro_test_readlines_{}_{}.txt", int(now()), rndString); +} + +Future testReadLines() { + auto filename = rndFileName(); + auto file = co_await IAsyncFileSystem::filesystem()->open( + filename, IAsyncFile::OPEN_CREATE | IAsyncFile::OPEN_READWRITE, 0640); + auto expectedLines = co_await writeTestFile(file); + auto lines = readLines(file); + for (int i = 0; i < expectedLines.size(); ++i) { + ASSERT(lines); + auto line = co_await lines(); + if (line != expectedLines[i]) { + fmt::print("ERROR on line {}:\n", i); + fmt::print("\t{}\n", line); + fmt::print("\t!=\n"); + fmt::print("\t{}\n", expectedLines[i]); + ASSERT(false); + } + } + try { + auto line = co_await lines(); + fmt::print("produced line after expected last line {}\n", expectedLines.size()); + fmt::print("\t{}\n", line.toString()); + // we should be done here + ASSERT(false); + } catch (Error& e) { + ASSERT(e.code() == error_code_end_of_stream); + } + // we only want to delete the file if the test was successful, so we don't wrap the above in a try-catch + co_await IAsyncFileSystem::filesystem()->deleteFile(filename, false); +} + +AsyncGenerator emptyGenerator() { + co_return; +} + +Future testEmptyGenerator() { + auto gen = emptyGenerator(); + while (gen) { + try { + fmt::print("Value: {}\n", co_await gen()); + ASSERT(false); + } catch (Error& e) { + ASSERT(e.code() == error_code_end_of_stream); + } + } +} + +template +Generator walkElements(std::vector elements) { + for (auto const& element : elements) { + co_yield element; + } +} + +void testElementWalker() { + int numElements = deterministicRandom()->randomInt(100, 1000); + std::vector elements; + elements.reserve(numElements); + for (int i = 0; i < numElements; ++i) { + elements.push_back(deterministicRandom()->randomInt(0, std::numeric_limits::max())); + } + auto verificationCopy = elements; + auto gen = walkElements(std::move(elements)); + for (auto e : verificationCopy) { + ASSERT(e == *gen); + ++gen; + } + try { + ASSERT(!gen); + } catch (Error& e) { + ASSERT(e.code() == error_code_end_of_stream); + } +} + +Future simpleCoro() { + co_await delay(0.1); +} + +Future testSimpleCoro() { + Future c = simpleCoro(); + fmt::print("Coro created\n"); + co_await delay(1.0); + fmt::print("After sleep\n"); + co_await c; + fmt::print("After await\n"); +} + +Generator fibonacci() { + unsigned curr = 1, next = 1; + loop { + co_yield curr; + curr = std::exchange(next, next + curr); + } +} + +std::vector fibDivisible(unsigned n, unsigned by) { + std::vector res; + res.reserve(n); + for (auto i = fibonacci(); res.size() < n; ++i) { + if (*i % by == 0) { + res.push_back(*i); + } + } + return res; +} + +std::vector fibDivisibleBad(unsigned n, unsigned by) { + unsigned curr = 1, next = 1; + std::vector res; + res.reserve(n); + while (res.size() < n) { + if (curr % by == 0) { + res.push_back(curr); + } + curr = std::exchange(next, curr + next); + } + return res; +} + +void testFibDivisible() { + { + auto fibs = fibDivisibleBad(4, 3); + std::vector expected({ 3, 21, 144, 987 }); + ASSERT(fibs.size() == expected.size()); + for (int i = 0; i < expected.size(); ++i) { + ASSERT(fibs[i] == expected[i]); + } + } + { + auto fibs = fibDivisible(4, 3); + std::vector expected({ 3, 21, 144, 987 }); + ASSERT(fibs.size() == expected.size()); + for (int i = 0; i < expected.size(); ++i) { + ASSERT(fibs[i] == expected[i]); + } + } +} + +} // namespace + +// TODO: the test is excluded in RandomUnitTests due to failures that happen when run the test concurrently with other +// unit tests. +TEST_CASE("/flow/coro/generators") { + testFibDivisible(); + co_await testEmptyGenerator(); + co_await testSimpleGenerator(); + co_await testReadLines(); + testElementWalker(); +} + +TEST_CASE("/flow/coro/actor") { + co_await testSimpleCoro(); + std::cout << "simple_wait_test\n"; + std::cout << "================\n"; + { + std::stringstream ss1; + try { + co_await simple_await_test(ss1, delay(1)); + } catch (Error& e) { + ss1 << "error: " << e.what() << ". "; + } + std::cout << ss1.str() << std::endl; + ASSERT(ss1.str() == "start. LifetimeLogger(0). wait returned. LifetimeLogger(1). ~LifetimeLogger(1). " + "~LifetimeLogger(0). "); + } + + co_await simpleWaitTestCoro(); + + std::cout << std::endl; + std::cout << "simple_wait_test\n"; + std::cout << "================\n"; + { + std::stringstream ss2; + try { + fmt::print("before wait\n"); + co_await simple_await_test(ss2, Future(io_error())); + } catch (Error& e) { + fmt::print("error\n"); + ss2 << "error: " << e.what() << ". "; + } + std::cout << ss2.str() << std::endl; + ASSERT(ss2.str() == "start. LifetimeLogger(0). ~LifetimeLogger(0). error: Disk i/o operation failed. "); + } + + std::cout << std::endl; + std::cout << "actor_cancel_test\n"; + std::cout << "=================\n"; + { + std::stringstream ss3; + { + Future f = actor_cancel_test(ss3); + co_await delay(1); + } + std::cout << ss3.str() << std::endl; + ASSERT(ss3.str() == "start. LifetimeLogger(0). error: Asynchronous operation cancelled. " + "~LifetimeLogger(0). "); + } + + std::cout << std::endl; + std::cout << "actor_throw_test\n"; + std::cout << "================\n"; + { + std::stringstream ss4; + try { + co_await actor_throw_test(ss4); + } catch (Error& e) { + ss4 << "error: " << e.what() << ". "; + } + std::cout << ss4.str() << std::endl; + ASSERT(ss4.str() == "start. LifetimeLogger(0). ~LifetimeLogger(0). error: Disk i/o operation failed. "); + } + + std::cout << std::endl; + co_await delay(0.1); + co_await testChooseWhen(); + co_await testUncancellable(); + co_await testUncancellable2(); + co_await futureStreamTest(); + co_await stackMemoryTest(); +} diff --git a/fdbserver/workloads/UnitTests.actor.cpp b/fdbserver/workloads/UnitTests.actor.cpp index 69e1308c22..5800eaa434 100644 --- a/fdbserver/workloads/UnitTests.actor.cpp +++ b/fdbserver/workloads/UnitTests.actor.cpp @@ -25,6 +25,7 @@ void forceLinkIndexedSetTests(); void forceLinkDequeTests(); void forceLinkFlowTests(); +void forceLinkCoroTests(); void forceLinkVersionedMapTests(); void forceLinkMemcpyTests(); void forceLinkMemcpyPerfTests(); @@ -95,6 +96,7 @@ struct UnitTestWorkload : TestWorkload { forceLinkIndexedSetTests(); forceLinkDequeTests(); forceLinkFlowTests(); + forceLinkCoroTests(); forceLinkVersionedMapTests(); forceLinkMemcpyTests(); forceLinkMemcpyPerfTests(); diff --git a/flow/CMakeLists.txt b/flow/CMakeLists.txt index 4e876b02fe..6a7bd03225 100644 --- a/flow/CMakeLists.txt +++ b/flow/CMakeLists.txt @@ -124,6 +124,8 @@ foreach(ft flow flow_sampling flowlinktest) find_library(CORE_FOUNDATION CoreFoundation) target_link_libraries(${ft} PRIVATE ${IO_KIT} ${CORE_FOUNDATION}) endif() + find_package(Coroutines COMPONENTS Experimental Final REQUIRED) + target_link_libraries(${ft} PUBLIC std::coroutines) endforeach() if(OPEN_FOR_IDE) diff --git a/flow/include/flow/Arena.h b/flow/include/flow/Arena.h index 068f7a8e52..f453426a7c 100644 --- a/flow/include/flow/Arena.h +++ b/flow/include/flow/Arena.h @@ -722,6 +722,40 @@ inline static uint8_t* mutateString(StringRef& s) { return const_cast(s.begin()); } +template +static Standalone concatenateStrings(StringRefType... strs) { + int totalSize = 0; + for (auto const& s : { strs... }) { + totalSize += s.size(); + } + + Standalone str = makeString(totalSize); + uint8_t* buf = mutateString(str); + + for (auto const& s : { strs... }) { + buf = s.copyTo(buf); + } + + return str; +} + +template +static StringRef concatenateStrings(Arena& arena, StringRefType... strs) { + int totalSize = 0; + for (auto const& s : { strs... }) { + totalSize += s.size(); + } + + StringRef str = makeString(totalSize, arena); + uint8_t* buf = mutateString(str); + + for (auto const& s : { strs... }) { + buf = s.copyTo(buf); + } + + return str; +} + template inline void load(Archive& ar, StringRef& value) { uint32_t length; diff --git a/flow/include/flow/CoroUtils.h b/flow/include/flow/CoroUtils.h new file mode 100644 index 0000000000..dd7ee4ac9c --- /dev/null +++ b/flow/include/flow/CoroUtils.h @@ -0,0 +1,240 @@ +/* + * CoroUtils.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_COROUTILS_H +#define FLOW_COROUTILS_H + +#include "flow/flow.h" + +namespace coro { + +template +using ConditionalActorCallback = std::conditional_t == FutureType::Future, + ActorCallback>, + ActorSingleCallback>>; + +template +struct ChooseImplCallback; + +template +struct ChooseImplCallback + : ConditionalActorCallback, Idx, F>, + ChooseImplCallback { + + using ThisCallback = ConditionalActorCallback, Idx, F>; + using ValueType = FutureReturnTypeT; + static constexpr FutureType futureType = GetFutureTypeV; + + [[nodiscard]] Parent* getParent() { return static_cast(this); } + + void registerCallbacks() { + if constexpr (futureType == FutureType::Future) { + StrictFuture sf = std::get(getParent()->futures); + sf.addCallbackAndClear(static_cast(this)); + } else { + auto sf = std::get(getParent()->futures); + sf.addCallbackAndClear(static_cast(this)); + } + if constexpr (sizeof...(Args) > 0) { + ChooseImplCallback::registerCallbacks(); + } + } + + void a_callback_fire(ThisCallback*, ValueType const& value) { + getParent()->removeCallbacks(); + try { + std::get(getParent()->functions)(value); + getParent()->SAV::sendAndDelPromiseRef(Void()); + } catch (Error& e) { + getParent()->SAV::sendErrorAndDelPromiseRef(e); + } catch (...) { + getParent()->SAV::sendErrorAndDelPromiseRef(unknown_error()); + } + } + + void a_callback_error(ThisCallback*, Error e) { + getParent()->removeCallbacks(); + getParent()->SAV::sendErrorAndDelPromiseRef(e); + } + + void removeCallbacks() { + ThisCallback::remove(); + if constexpr (sizeof...(Args) > 0) { + ChooseImplCallback::removeCallbacks(); + } + } +}; + +template +struct ChooseImplCallback { +#ifdef ENABLE_SAMPLING + LineageReference* lineageAddr() { return currentLineage; } +#endif +}; + +template +struct ChooseImplActor final : Actor, + ChooseImplCallback, 0, Args...>, + FastAllocated> { + std::tuple futures; + std::tuple const&)>...> functions; + + using FastAllocated>::operator new; + using FastAllocated>::operator delete; + + ChooseImplActor(std::tuple&& futures, + std::tuple const&)>...>&& functions) + : Actor(), futures(futures), functions(functions) { + ChooseImplCallback, 0, Args...>::registerCallbacks(); + } + + void cancel() override { + auto waitState = actor_wait_state; + actor_wait_state = -1; + if (waitState) { + ChooseImplCallback, 0, Args...>::removeCallbacks(); + SAV::sendErrorAndDelPromiseRef(actor_cancelled()); + } + } + + void destroy() override { delete this; } +}; + +template +class ChooseClause { + std::tuple futures; + std::tuple const&)>...> functions; + bool noop = false; + + template + auto getNoop() { + using FType = std::conditional_t, Future>; + return ChooseClause(std::tuple_cat(std::move(futures), std::make_tuple(FType())), true); + } + + template + auto getNoop(const Future& future) { + return ChooseClause const&>( + std::tuple_cat(std::move(futures), std::make_tuple(std::cref(future))), true); + } + + template + auto getNoop(const FutureStream& future) { + return ChooseClause const&>( + std::tuple_cat(std::move(futures), std::make_tuple(std::cref(future))), true); + } + +public: + ChooseClause(std::tuple&& futures, + std::tuple const&)>...>&& functions) + : futures(std::move(futures)), functions(std::move(functions)) {} + explicit ChooseClause(std::tuple&& futures, bool noop = false) : futures(std::move(futures)), noop(noop) {} + explicit ChooseClause() : futures(std::tuple<>()) {} + + auto When(std::invocable auto futureCallback, std::invocable auto fun) { + using FType = decltype(futureCallback()); + using FReturnType = FutureReturnTypeT; + constexpr bool isStream = GetFutureTypeV == FutureType::FutureStream; + using ArgType = std::conditional_t, Future>; + if (noop) { + return getNoop(); + } + auto future = futureCallback(); + if (future.isReady()) { + fun(future.get()); + return getNoop(); + } + std::function function = fun; + return ChooseClause(std::tuple_cat(std::move(futures), std::make_tuple(future)), + std::tuple_cat(std::move(functions), std::make_tuple(function))); + } + + template + auto When(Future const& future, std::invocable auto fun) { + static_assert(std::is_same_v())), void>, + "When-handler must return void (and can't be awaitable)"); + if (noop) { + return getNoop(future); + } + if (future.isReady()) { + fun(future.get()); + return getNoop(future); + } + std::function function = fun; + return ChooseClause const&>( + std::tuple_cat(std::move(futures), std::make_tuple(std::cref(future))), + std::tuple_cat(std::move(functions), std::make_tuple(function))); + } + + template + auto When(FutureStream const& futureStream, std::invocable auto fun) { + static_assert(std::is_same_v())), void>, + "When-handler must return void (and't can't be awaitable)"); + if (noop) { + return getNoop(futureStream); + } + if (futureStream.isReady()) { + auto fs = futureStream; + fun(fs.pop()); + return getNoop(futureStream); + } + std::function function = fun; + return ChooseClause const&>( + std::tuple_cat(std::move(futures), std::make_tuple(std::cref(futureStream))), + std::tuple_cat(std::move(functions), std::make_tuple(function))); + } + + [[nodiscard]] Future run() { + if (noop) { + return Void(); + } + return Future(new ChooseImplActor(std::move(futures), std::move(functions))); + } +}; + +} // namespace coro + +using Choose = coro::ChooseClause<>; + +template +AsyncGenerator map(AsyncGenerator gen, F pred) { + while (gen) { + auto val = co_await gen(); + if (pred(val)) { + co_yield val; + } + } +} + +template +AsyncGenerator toGenerator(FutureStream stream) { + while (true) { + try { + co_yield co_await stream; + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + co_return; + } + throw; + } + } +} + +#endif // FLOW_COROUTILS_H diff --git a/flow/include/flow/Coroutines.h b/flow/include/flow/Coroutines.h new file mode 100644 index 0000000000..29914206c9 --- /dev/null +++ b/flow/include/flow/Coroutines.h @@ -0,0 +1,167 @@ +/* + * Coroutines.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_COROUTINES_H +#define FLOW_COROUTINES_H + +#pragma once + +#if __has_include() +#include +namespace n_coroutine = ::std; +#elif __has_include() +#include +namespace n_coroutine = ::std::experimental; +#endif +#include +#include +#include + +#include "flow/flow.h" +#include "flow/Error.h" +#include "flow/CoroutinesImpl.h" + +struct Uncancellable {}; + +template +class AsyncGenerator { + PromiseStream* nextPromise; + n_coroutine::coroutine_handle<> handle; + +public: + explicit AsyncGenerator(PromiseStream* promise, n_coroutine::coroutine_handle<> handle) + : nextPromise(promise), handle(handle) {} + + ~AsyncGenerator() { handle.destroy(); } + + Future operator()() { + handle.resume(); + Error error; + T res; + try { + res = co_await nextPromise->getFuture(); + } catch (Error& e) { + error = e; + } catch (...) { + error = unknown_error(); + } + co_await delay(0); + if (error.isValid()) { + throw error; + } + co_return res; + } + + explicit operator bool() { return !handle.done(); } +}; + +// Inspired from https://www.scs.stanford.edu/~dm/blog/c++-coroutines.html +template +class Generator { +public: // types + using promise_type = coro::GeneratorPromise; + using handle_type = n_coroutine::coroutine_handle; + using value_type = T; + using difference_type = std::ptrdiff_t; + +private: + handle_type handle; + +public: + explicit Generator(handle_type h) : handle(h) {} + Generator() {} + Generator(Generator const& other) : handle(other.handle) { + if (handle) { + handle.promise().addRef(); + } + } + Generator(Generator&& other) : handle(std::move(other.handle)) { other.handle = handle_type{}; } + ~Generator() { + if (handle) { + handle.promise().delRef(); + } + } + + Generator& operator=(Generator const& other) { + if (handle) { + handle.promise().delRef(); + } + handle = other.handle; + if (handle) { + handle.promise().addRef(); + } + return *this; + } + + Generator& operator=(Generator&& other) { + if (handle) { + handle.promise().delRef(); + } + handle = std::move(other.handle); + other.handle = handle_type{}; + return *this; + } + + explicit operator bool() { return handle && !handle.done(); } + + static Generator end() { return Generator{}; } + + bool operator==(Generator const& other) const { + bool selfValid = handle && !handle.done(); + bool otherValid = other.handle && !other.handle.done(); + if (selfValid == otherValid) { + // if both generator are done, we consider them the same, otherwise they are only the same if both point to + // the same coroutine + return !selfValid || handle == other.handle; + } + return false; + } + + bool operator!=(Generator const& other) const { return !(*this == other); } + + const T& operator*() const& { + auto& promise = handle.promise(); + if (promise.error.isValid()) { + throw promise.error; + } + ASSERT(promise.value.has_value()); + return promise.value.value(); + } + + Generator& operator++() { + handle.resume(); + return *this; + } + + void operator++(int) { ++(*this); } +}; + +template +struct [[maybe_unused]] n_coroutine::coroutine_traits, Args...> { + using promise_type = coro::CoroPromise>; +}; + +template +struct [[maybe_unused]] n_coroutine::coroutine_traits, Args...> { + static_assert(!coro::hasUncancellable, "AsyncGenerator can't be uncancellable"); + using promise_type = coro::AsyncGeneratorPromise; +}; + +#endif // FLOW_COROUTINES_H diff --git a/flow/include/flow/CoroutinesImpl.h b/flow/include/flow/CoroutinesImpl.h new file mode 100644 index 0000000000..01d066dd59 --- /dev/null +++ b/flow/include/flow/CoroutinesImpl.h @@ -0,0 +1,498 @@ +/* + * CoroutinesImpl.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2023 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLOW_COROUTINESIMPL_H +#define FLOW_COROUTINESIMPL_H + +#include "flow/flow.h" + +template +class Generator; + +template +class AsyncGenerator; + +struct Uncancellable; + +namespace coro { + +template +struct FutureReturnType; + +template +struct FutureReturnType> { + using type = T; +}; + +template +struct FutureReturnType> { + using type = T; +}; + +template +struct FutureReturnType const&> { + using type = T; +}; + +template +struct FutureReturnType const&> { + using type = T; +}; + +template +using FutureReturnTypeT = typename FutureReturnType::type; + +enum class FutureType { FutureStream, Future }; + +template +struct GetFutureType; + +template +struct GetFutureType> { + constexpr static FutureType value = FutureType::Future; +}; + +template +struct GetFutureType> { + constexpr static FutureType value = FutureType::FutureStream; +}; + +template +struct GetFutureType const&> { + constexpr static FutureType value = FutureType::Future; +}; + +template +struct GetFutureType const&> { + constexpr static FutureType value = FutureType::FutureStream; +}; + +template +inline constexpr FutureType GetFutureTypeV = GetFutureType::value; + +template +struct CoroActor final : Actor, Void, T>> { + using ValType = std::conditional_t, Void, T>; + + static void* operator new(size_t s) { return allocateFast(int(s)); } + static void operator delete(void* p, size_t s) { freeFast(int(s), p); } + + n_coroutine::coroutine_handle<> handle; + + int8_t& waitState() { return Actor::actor_wait_state; } + + template + void set(U&& value) { + new (&SAV::value()) ValType(std::forward(value)); + SAV::error_state = Error(SAV::SET_ERROR_CODE); + } + + void setError(Error const& e) { SAV::error_state = e; } + + void cancel() override { + if constexpr (IsCancellable) { + auto prev_wait_state = Actor::actor_wait_state; + + // Set wait state to -1 + Actor::actor_wait_state = -1; + + // If the actor is waiting, then resume the coroutine to throw actor_cancelled(). + if (prev_wait_state > 0) { + handle.resume(); + } + } + } + + void destroy() override { delete this; } +}; + +template +struct AwaitableFutureStore { + std::variant data; + + constexpr bool isSet() const noexcept { return data.index() != 0 || std::get<0>(data).isValid(); } + void copy(U v) { data = std::move(v); } + void set(U&& v) { data = std::move(v); } + void set(U const& v) { data = v; } + + const U& getRef() const { + switch (data.index()) { + case 0: + throw std::get<0>(data); + case 1: + return std::get<1>(data); + } + UNREACHABLE(); + } + + U&& get() && { + switch (data.index()) { + case 0: + throw std::get<0>(data); + case 1: + return std::get<1>(std::move(data)); + } + UNREACHABLE(); + } +}; + +template +using ToFutureVal = std::conditional_t, Void, T>; + +template +struct AwaitableResume; + +template +struct AwaitableResume { + [[maybe_unused]] void await_resume() { + auto self = static_cast(this); + self->resumeImpl(); + if (self->future.isError()) { + throw self->future.getError(); + } + } +}; + +template +struct AwaitableResume { + T const& await_resume() { + auto self = static_cast(this); + if (self->resumeImpl()) { + if (self->future.isError()) { + throw self->future.getError(); + } + return self->future.get(); + } + return self->store.getRef(); + } +}; + +template +struct AwaitableResume { + T await_resume() { + auto self = static_cast(this); + if (self->resumeImpl()) { + if (self->future.isError()) { + throw self->future.getError(); + } + return self->future.pop(); + } + return std::move(self->store).get(); + } +}; + +template +struct AwaitableFuture : std::conditional_t>, Callback>>, + AwaitableResume, U, IsStream> { + using FutureValue = ToFutureVal; + using FutureType = std::conditional_t, Future const&>; + FutureType future; + promise_type* pt = nullptr; + AwaitableFutureStore store; + + AwaitableFuture(const FutureType& f, promise_type* pt) : future(f), pt(pt) {} + + void fire(FutureValue const& value) override { + store.set(value); + pt->resume(); + } + void fire(FutureValue&& value) override { + store.set(std::move(value)); + pt->resume(); + } + + void error(Error error) override { + store.data = error; + pt->resume(); + } + + [[maybe_unused]] [[nodiscard]] bool await_ready() const { + if (pt->waitState() < 0) { + pt->waitState() = -2; + // actor was cancelled + return true; + } + return future.isReady(); + } + + [[maybe_unused]] void await_suspend(n_coroutine::coroutine_handle<> h) { + // Create a coroutine callback if it's the first time being suspended + pt->setHandle(h); + + // Set wait_state and add callback + pt->waitState() = 1; + + if constexpr (IsStream) { + auto sf = future; + sf.addCallbackAndClear(this); + } else { + StrictFuture sf = future; + sf.addCallbackAndClear(this); + } + } + + bool resumeImpl() { + // If actor is cancelled, then throw actor_cancelled() + switch (pt->waitState()) { + case -1: + this->remove(); + case -2: + // -2 means that the `await_suspend` call returned `true`, so we shouldn't remove the callback. + // if the wait_state is -1 we still have to throw, so we fall through to the -2 case + throw actor_cancelled(); + } + + bool wasReady = pt->waitState() == 0; + // Actor return from waiting, remove callback and reset wait_state. + if (pt->waitState() > 0) { + this->remove(); + + pt->waitState() = 0; + } + return wasReady; + } +}; + +template +struct ActorMember { + T* member; + explicit ActorMember(n_coroutine::coroutine_handle<> handle) : member(new T(handle)) {} + T* ptr() { return member; } + T* operator->() { return member; } + const T* operator->() const { return member; } +}; + +template +struct ActorMember { + T member; + explicit ActorMember(n_coroutine::coroutine_handle<> handle) : member(handle) {} + T* ptr() { return &member; } + T* operator->() { return &member; } + const T* operator->() const { return &member; } +}; + +template +struct CoroReturn { + template + void return_value(U&& value) { + static_cast(this)->coroActor->set(std::forward(value)); + } +}; + +template +struct CoroReturn { + void return_void() { static_cast(this)->coroActor->set(Void()); } +}; + +template +struct CoroPromise : CoroReturn> { + using promise_type = CoroPromise; + using ActorType = coro::CoroActor; + using ReturnValue = std::conditional_t, Void, T>; + using ReturnFutureType = Future; + + ActorType* coroActor; + + CoroPromise() : coroActor(new ActorType()) {} + + n_coroutine::coroutine_handle handle() { + return n_coroutine::coroutine_handle::from_promise(*this); + } + + static void* operator new(size_t s) { return allocateFast(int(s)); } + static void operator delete(void* p, size_t s) { freeFast(int(s), p); } + + ReturnFutureType get_return_object() noexcept { return ReturnFutureType(coroActor); } + + [[nodiscard]] n_coroutine::suspend_never initial_suspend() const noexcept { return {}; } + + auto final_suspend() noexcept { + struct FinalAwaitable { + ActorType* sav; + // for debugging output only + explicit FinalAwaitable(ActorType* sav) : sav(sav) {} + + [[nodiscard]] bool await_ready() const noexcept { return true; } + void await_resume() const noexcept { + if (sav->isError()) { + sav->finishSendErrorAndDelPromiseRef(); + } else { + sav->finishSendAndDelPromiseRef(); + } + } + constexpr void await_suspend(n_coroutine::coroutine_handle<>) const noexcept {} + }; + return FinalAwaitable(coroActor); + } + + void unhandled_exception() { + // The exception should always be type Error. + try { + std::rethrow_exception(std::current_exception()); + } catch (const Error& error) { + // if (Actor::actor_wait_state == -1 && error.code() == error_code_operation_cancelled) { + // return; + // } + coroActor->setError(error); + // SAV::sendErrorAndDelPromiseRef(error); + } catch (...) { + coroActor->setError(unknown_error()); + // SAV::sendErrorAndDelPromiseRef(unknown_error()); + } + } + + void setHandle(n_coroutine::coroutine_handle<> h) { coroActor->handle = h; } + + void resume() { coroActor->handle.resume(); } + + int8_t& waitState() { return coroActor->waitState(); } + + template + auto await_transform(const Future& future) { + return coro::AwaitableFuture{ future, this }; + } + + template + auto await_transform(const FutureStream& futureStream) { + return coro::AwaitableFuture{ futureStream, this }; + } +}; + +template +struct GeneratorPromise { + using handle_type = n_coroutine::coroutine_handle>; + static void* operator new(size_t s) { return allocateFast(int(s)); } + static void operator delete(void* p, size_t s) { freeFast(int(s), p); } + + Error error; + std::optional value; + mutable unsigned refCount = 1; + + void addRef() const { refCount += 1; } + void delRef() const { + if (--refCount == 0) { + const_cast*>(this)->handle().destroy(); + } + } + + n_coroutine::suspend_never initial_suspend() { return {}; } + n_coroutine::suspend_always final_suspend() noexcept { return {}; } + + auto handle() { return handle_type::from_promise(*this); } + + Generator get_return_object() { return Generator(handle_type::from_promise(*this)); } + + void unhandled_exception() { + try { + std::rethrow_exception(std::current_exception()); + } catch (Error& e) { + error = e; + } catch (...) { + error = unknown_error(); + } + } + + template From> // C++20 concept + n_coroutine::suspend_always yield_value(From&& from) { + value = std::forward(from); + return {}; + } + + void return_void() {} +}; + +template +struct AsyncGeneratorPromise { + using promise_type = AsyncGeneratorPromise; + + static void* operator new(size_t s) { return allocateFast(int(s)); } + static void operator delete(void* p, size_t s) { freeFast(int(s), p); } + + n_coroutine::coroutine_handle handle() { + return n_coroutine::coroutine_handle::from_promise(*this); + } + + [[nodiscard]] n_coroutine::suspend_always initial_suspend() const noexcept { return {}; } + [[nodiscard]] n_coroutine::suspend_always final_suspend() const noexcept { return {}; } + + AsyncGenerator get_return_object() { return AsyncGenerator(&nextPromise, handle()); } + + void return_void() { nextPromise.sendError(end_of_stream()); } + + void unhandled_exception() { + // The exception should always be type Error. + try { + std::rethrow_exception(std::current_exception()); + } catch (const Error& error) { + nextPromise.sendError(error); + } catch (...) { + nextPromise.sendError(unknown_error()); + } + } + + template U> + n_coroutine::suspend_always yield_value(U&& value) { + nextPromise.send(std::forward(value)); + return {}; + } + + void setHandle(n_coroutine::coroutine_handle<> h) { mHandle = h; } + int8_t& waitState() { return mWaitState; } + void resume() { mHandle.resume(); } + + template + auto await_transform(const Future& future) { + return coro::AwaitableFuture{ future, this }; + } + + template + auto await_transform(const FutureStream& futureStream) { + return coro::AwaitableFuture{ futureStream, this }; + } + + n_coroutine::coroutine_handle<> mHandle; + PromiseStream nextPromise; + int8_t mWaitState = 0; +}; + +template +struct HasUncancellable; + +template <> +struct HasUncancellable<> { + static constexpr bool value = false; +}; + +template +struct HasUncancellable { + static constexpr bool value = HasUncancellable::value; +}; + +template +struct HasUncancellable { + static constexpr bool value = true; +}; + +template +inline constexpr bool hasUncancellable = HasUncancellable::value; + +} // namespace coro + +#endif // FLOW_COROUTINESIMPL_H diff --git a/flow/include/flow/flow.h b/flow/include/flow/flow.h index a72bc781f6..286dbba14a 100644 --- a/flow/include/flow/flow.h +++ b/flow/include/flow/flow.h @@ -874,6 +874,20 @@ public: destroy(); } + // this is only used for C++ coroutines + void finishSendErrorAndDelPromiseRef() { + if (promises == 1 && !futures) { + // No one is left to receive the value, so we can just die + destroy(); + return; + } + while (Callback::next != this) + Callback::next->error(this->error_state); + + if (!--promises && !futures) + destroy(); + } + void addPromiseRef() { promises++; } void addFutureRef() { futures++; } @@ -1610,5 +1624,6 @@ void bindDeterministicRandomToOpenssl(); #pragma clang diagnostic pop #endif +#include "flow/Coroutines.h" #include "flow/genericactors.actor.h" #endif diff --git a/flow/include/flow/genericactors.actor.h b/flow/include/flow/genericactors.actor.h index 3ee9e776e9..3e704e9441 100644 --- a/flow/include/flow/genericactors.actor.h +++ b/flow/include/flow/genericactors.actor.h @@ -40,6 +40,7 @@ #include #include "flow/flow.h" +#include "flow/CoroUtils.h" #include "flow/Knobs.h" #include "flow/Util.h" #include "flow/IndexedSet.h" diff --git a/flow/include/flow/network.h b/flow/include/flow/network.h index 2d2b0bb658..6e45ccc754 100644 --- a/flow/include/flow/network.h +++ b/flow/include/flow/network.h @@ -115,6 +115,7 @@ struct NetworkInfo { FlowLock* handshakeLock; NetworkInfo(); + ~NetworkInfo(); }; class IEventFD : public ReferenceCounted { diff --git a/flow/network.cpp b/flow/network.cpp index 22afb4c7cf..10294eddb1 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -448,3 +448,6 @@ TEST_CASE("/flow/network/ipV6Preferred") { } NetworkInfo::NetworkInfo() : handshakeLock(new FlowLock(FLOW_KNOBS->TLS_HANDSHAKE_LIMIT)) {} +NetworkInfo::~NetworkInfo() { + delete handshakeLock; +}