Merge pull request #11064 from apple/features/coroutines

Added C++ Coroutine support to Flow
This commit is contained in:
Jingyu Zhou 2023-11-22 09:58:32 -08:00 committed by GitHub
commit ee626409c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 4576 additions and 0 deletions

282
cmake/FindCoroutines.cmake Normal file
View File

@ -0,0 +1,282 @@
# Copyright (c) 2019-present, Facebook, Inc.
#
# This source code is licensed under the Apache License found in the
# LICENSE.txt file in the root directory of this source tree.
#[=======================================================================[.rst:
FindCoroutines
##############
This module supports the C++ standard support for coroutines. Use
the :imp-target:`std::coroutines` imported target to
Options
*******
The ``COMPONENTS`` argument to this module supports the following values:
.. find-component:: Experimental
:name: coro.Experimental
Allows the module to find the "experimental" Coroutines TS
version of the coroutines library. This is the library that should be
used with the ``std::experimental`` namespace.
.. find-component:: Final
:name: coro.Final
Finds the final C++20 standard version of coroutines.
If no components are provided, behaves as if the
:find-component:`coro.Final` component was specified.
If both :find-component:`coro.Experimental` and :find-component:`coro.Final` are
provided, first looks for ``Final``, and falls back to ``Experimental`` in case
of failure. If ``Final`` is found, :imp-target:`std::coroutines` and all
:ref:`variables <coro.variables>` will refer to the ``Final`` version.
Imported Targets
****************
.. imp-target:: std::coroutines
The ``std::coroutines`` imported target is defined when any requested
version of the C++ coroutines library has been found, whether it is
*Experimental* or *Final*.
If no version of the coroutines library is available, this target will not
be defined.
.. note::
This target has ``cxx_std_17`` as an ``INTERFACE``
:ref:`compile language standard feature <req-lang-standards>`. Linking
to this target will automatically enable C++17 if no later standard
version is already required on the linking target.
.. _coro.variables:
Variables
*********
.. variable:: CXX_COROUTINES_HAVE_COROUTINES
Set to ``TRUE`` when coroutines are supported in both the language and the
library.
.. variable:: CXX_COROUTINES_HEADER
Set to either ``coroutine`` or ``experimental/coroutine`` depending on
whether :find-component:`coro.Final` or :find-component:`coro.Experimental` was
found.
.. variable:: CXX_COROUTINES_NAMESPACE
Set to either ``std`` or ``std::experimental``
depending on whether :find-component:`coro.Final` or
:find-component:`coro.Experimental` was found.
Examples
********
Using `find_package(Coroutines)` with no component arguments:
.. code-block:: cmake
find_package(Coroutines REQUIRED)
add_executable(my-program main.cpp)
target_link_libraries(my-program PRIVATE std::coroutines)
#]=======================================================================]
if(TARGET std::coroutines)
# This module has already been processed. Don't do it again.
return()
endif()
include(CheckCXXCompilerFlag)
include(CMakePushCheckState)
include(CheckIncludeFileCXX)
include(CheckCXXSourceCompiles)
cmake_push_check_state()
set(CMAKE_REQUIRED_QUIET ${Coroutines_FIND_QUIETLY})
check_cxx_compiler_flag(/await _CXX_COROUTINES_SUPPORTS_MS_FLAG)
check_cxx_compiler_flag(/await:heapelide _CXX_COROUTINES_SUPPORTS_MS_HEAPELIDE_FLAG)
check_cxx_compiler_flag(-fcoroutines-ts _CXX_COROUTINES_SUPPORTS_TS_FLAG)
check_cxx_compiler_flag(-fcoroutines _CXX_COROUTINES_SUPPORTS_CORO_FLAG)
if(_CXX_COROUTINES_SUPPORTS_MS_FLAG)
set(_CXX_COROUTINES_EXTRA_FLAGS "/await")
if(_CXX_COROUTINES_SUPPORTS_MS_HEAPELIDE_FLAG AND CMAKE_SIZEOF_VOID_P GREATER_EQUAL 8)
list(APPEND _CXX_COROUTINES_EXTRA_FLAGS "/await:heapelide")
endif()
elseif(_CXX_COROUTINES_SUPPORTS_TS_FLAG)
set(_CXX_COROUTINES_EXTRA_FLAGS "-fcoroutines-ts")
elseif(_CXX_COROUTINES_SUPPORTS_CORO_FLAG)
set(_CXX_COROUTINES_EXTRA_FLAGS "-fcoroutines")
endif()
# Normalize and check the component list we were given
set(want_components ${Coroutines_FIND_COMPONENTS})
if(Coroutines_FIND_COMPONENTS STREQUAL "")
set(want_components Final)
endif()
# Warn on any unrecognized components
set(extra_components ${want_components})
list(REMOVE_ITEM extra_components Final Experimental)
foreach(component IN LISTS extra_components)
message(WARNING "Extraneous find_package component for Coroutines: ${component}")
endforeach()
# Detect which of Experimental and Final we should look for
set(find_experimental TRUE)
set(find_final TRUE)
if(NOT "Final" IN_LIST want_components)
set(find_final FALSE)
endif()
if(NOT "Experimental" IN_LIST want_components)
set(find_experimental FALSE)
endif()
if(find_final)
check_include_file_cxx("coroutine" _CXX_COROUTINES_HAVE_HEADER)
if(_CXX_COROUTINES_HAVE_HEADER)
check_cxx_source_compiles("#include <coroutine> \n typedef std::suspend_never blub; \nint main() {} " _CXX_COROUTINES_FINAL_HEADER_COMPILES)
set(_CXX_COROUTINES_HAVE_HEADER "${_CXX_COROUTINES_FINAL_HEADER_COMPILES}")
endif()
if(NOT _CXX_COROUTINES_HAVE_HEADER)
cmake_push_check_state()
set(CMAKE_REQUIRED_FLAGS "${_CXX_COROUTINES_EXTRA_FLAGS}")
check_include_file_cxx("coroutine" _CXX_COROUTINES_HAVE_HEADER_WITH_FLAG)
if(_CXX_COROUTINES_HAVE_HEADER_WITH_FLAG)
check_cxx_source_compiles("#include <coroutine> \n typedef std::suspend_never blub; \nint main() {} " _CXX_COROUTINES_FINAL_HEADER_COMPILES_WITH_FLAG)
set(_CXX_COROUTINES_HAVE_HEADER_WITH_FLAG "${_CXX_COROUTINES_FINAL_HEADER_COMPILES_WITH_FLAG}")
endif()
set(_CXX_COROUTINES_HAVE_HEADER "${_CXX_COROUTINES_HAVE_HEADER_WITH_FLAG}")
cmake_pop_check_state()
endif()
mark_as_advanced(_CXX_COROUTINES_HAVE_HEADER)
if(_CXX_COROUTINES_HAVE_HEADER)
# We found the non-experimental header. Don't bother looking for the
# experimental one.
set(find_experimental FALSE)
endif()
else()
set(_CXX_COROUTINES_HAVE_HEADER FALSE)
endif()
if(find_experimental)
check_include_file_cxx("experimental/coroutine" _CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER)
if(NOT _CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER)
cmake_push_check_state()
set(CMAKE_REQUIRED_FLAGS "${_CXX_COROUTINES_EXTRA_FLAGS}")
check_include_file_cxx("experimental/coroutine" _CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER_WITH_FLAG)
set(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER "${_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER_WITH_FLAG}")
cmake_pop_check_state()
endif()
mark_as_advanced(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER)
else()
set(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER FALSE)
endif()
if(_CXX_COROUTINES_HAVE_HEADER)
set(_have_coro TRUE)
set(_coro_header coroutine)
set(_coro_namespace std)
elseif(_CXX_COROUTINES_HAVE_EXPERIMENTAL_HEADER)
set(_have_coro TRUE)
set(_coro_header experimental/coroutine)
set(_coro_namespace std::experimental)
else()
set(_have_coro FALSE)
endif()
set(CXX_COROUTINES_HAVE_COROUTINES ${_have_coro} CACHE BOOL "TRUE if we have the C++ coroutines feature")
set(CXX_COROUTINES_HEADER ${_coro_header} CACHE STRING "The header that should be included to obtain the coroutines APIs")
set(CXX_COROUTINES_NAMESPACE ${_coro_namespace} CACHE STRING "The C++ namespace that contains the coroutines APIs")
set(_found FALSE)
if(CXX_COROUTINES_HAVE_COROUTINES)
# We have some coroutines library available. Do link checks
string(CONFIGURE [[
#include <utility>
#include <@CXX_COROUTINES_HEADER@>
struct present {
struct promise_type {
int result;
present get_return_object() { return present{*this}; }
@CXX_COROUTINES_NAMESPACE@::suspend_never initial_suspend() { return {}; }
@CXX_COROUTINES_NAMESPACE@::suspend_always final_suspend() noexcept { return {}; }
void return_value(int i) { result = i; }
void unhandled_exception() {}
};
friend struct promise_type;
present(present&& that) : coro_(std::exchange(that.coro_, {})) {}
~present() { if(coro_) coro_.destroy(); }
bool await_ready() const { return true; }
void await_suspend(@CXX_COROUTINES_NAMESPACE@::coroutine_handle<>) const {}
int await_resume() const { return coro_.promise().result; }
private:
present(promise_type& promise)
: coro_(@CXX_COROUTINES_NAMESPACE@::coroutine_handle<promise_type>::from_promise(promise)) {}
@CXX_COROUTINES_NAMESPACE@::coroutine_handle<promise_type> coro_;
};
present f(int n) {
if (n < 2)
co_return 1;
else
co_return n * co_await f(n - 1);
}
int main() {
return f(5).await_resume() != 120;
}
]] code @ONLY)
# Try to compile a simple coroutines program without any compiler flags
check_cxx_source_compiles("${code}" CXX_COROUTINES_NO_AWAIT_NEEDED)
set(can_link ${CXX_COROUTINES_NO_AWAIT_NEEDED})
if(NOT CXX_COROUTINES_NO_AWAIT_NEEDED)
# Add the -fcoroutines-ts (or /await) flag
set(CMAKE_REQUIRED_FLAGS "${_CXX_COROUTINES_EXTRA_FLAGS}")
check_cxx_source_compiles("${code}" CXX_COROUTINES_AWAIT_NEEDED)
set(can_link "${CXX_COROUTINES_AWAIT_NEEDED}")
endif()
if(can_link)
add_library(std::coroutines INTERFACE IMPORTED)
set(_found TRUE)
if(CXX_COROUTINES_NO_AWAIT_NEEDED)
# Nothing to add...
elseif(CXX_COROUTINES_AWAIT_NEEDED)
target_compile_options(std::coroutines INTERFACE ${_CXX_COROUTINES_EXTRA_FLAGS})
endif()
else()
set(CXX_COROUTINES_HAVE_COROUTINES FALSE)
endif()
endif()
cmake_pop_check_state()
set(Coroutines_FOUND ${_found} CACHE BOOL "TRUE if we can compile and link a program using std::coroutines" FORCE)
if(Coroutines_FIND_REQUIRED AND NOT Coroutines_FOUND)
message(FATAL_ERROR "Cannot compile simple program using std::coroutines. Is C++17 or later activated?")
endif()

700
design/coroutines.md Normal file
View File

@ -0,0 +1,700 @@
# Coroutines in Flow
* [Introduction](#Introduction)
* [Coroutines vs ACTORs](#coroutines-vs-actors)
* [Basic Types](#basic-types)
* [Choose-When](#choose-when)
* [Execution in when-expressions](#execution-in-when-expressions)
* [Waiting in When-Blocks](#waiting-in-when-blocks)
* [Generators](#generators)
* [Generators and ranges](#generators-and-ranges)
* [Eager vs Lazy Execution](#eager-vs-lazy-execution)
* [Generators vs Promise Streams](#generators-vs-promise-streams)
* [Uncancellable](#uncancellable)
* [Porting ACTOR's to C++ Coroutines](#porting-actors-to-c-coroutines)
* [Lifetime of Locals](#lifetime-of-locals)
* [Unnecessary Helper Actors](#unnecessary-helper-actors)
* [Replace Locals with Temporaries](#replace-locals-with-temporaries)
* [Don't Wait in Error-Handlers](#dont-wait-in-error-handlers)
* [Make Static Functions Class Members](#make-static-functions-class-members)
* [Initialization of Locals](#initialization-of-locals)
## Introduction
In the past Flow implemented an actor mode by shipping its own compiler which would
extend the C++ language with a few additional keywords. This, while still supported,
is deprecated in favor of the standard C++20 coroutines.
Coroutines are meant to be simple, look like serial code, and be easy to reason about.
As simple example for a coroutine function can look like this:
```c++
Future<double> simpleCoroutine() {
double begin = now();
co_await delay(1.0);
co_return now() - begin;
}
```
This document assumes some familiarity with Flow. As of today, actors and coroutines
can be freely mixed, but new code should be written using coroutines.
## Coroutines vs ACTORs
It is important to understand that C++ coroutine support doesn't change anything in Flow: they are not a replacement
of Flow but they replace the actor compiler with a C++ compiler. This means, that the network loop, all Flow types,
the RPC layer, and the simulator all remain unchanged. A coroutine simply returns a special `SAV<T>` which has handle
to a coroutine.
## Basic Types
As defined in the C++20 standard, a function is a coroutine if its body contains at least one `co_await`, `co_yield`,
or `co_return` statement. However, in order for this to work, the return type needs an underlying coroutine
implementation. Flow provides these for the following types:
* `Future<T>` is the primary type we use for coroutines. A coroutine returning
`Future<T>` is allowed to `co_await` other coroutines and it can `co_return`
a single value. `co_yield` is not implemented by this type.
* A special case is `Future<Void>`. Void-Futures are what a user would probably
expect `Future<>` to be (it has this type for historical reasons and to
provide compatibility with old Flow `ACTOR`s). A coroutine with return type
`Future<Void>` must not return anything. So either the coroutine can run until
the end, or it can be terminated by calling `co_return`.
* `Generator<T>` can return a stream of values. However, they can't `co_await`
other coroutines. These are useful for streams where the values are lazily
computed but don't involve any IO.
* `AsyncGenerator<T>` is similar to `Generator<T>` in that it can return a stream
of values, but in addition to that it can also `co_await` other coroutines.
Due to that, they're slightly less efficient than `Generator<T>`.
`AsyncGenerator<T>` should be used whenever values should be lazily generated
AND need IO. It is an alternative to `PromiseStream<T>`, which can be more efficient, but is
more intuitive to use correctly.
A more detailed explanation of `Generator<T>` and `AsyncGenerator<T>` can be
found further down.
## Choose-When
In actor compiled code we were able to use the keywords `choose` and `when` to wait on a
statically known number of futures and execute corresponding code. Something like this:
```c++
choose {
when(wait(future1)) {
// do something
}
when(Foo f = wait(foo())) {
// do something else
}
}
```
Since this is a compiler functionality, we can't use this with C++ coroutines. We could
keep only this feature around, but only using standard C++ is desirable. So instead, we
introduce a new `class` called `Choose` to achieve something very similar:
```c++
co_await Choose()
.When(future1, [](Void& const) {
// do something
})
.When(foo(), [](Foo const& f) {
// do something else
}).Run();
```
While `Choose` and `choose` behave very similarly, there are some minor differences between
the two. These are explained below.
### Execution in when-expressions
In the above example, there is one, potentially important difference between the old and new
style: in the statement `when(Foo f = wait(foo()))` is only executed if `future1` is not ready.
Depending on what the intent of the statement is, this could be desirable. Since `Choose::When`
is a normal method, `foo()` will be evaluated whether the statement is already done or not.
This can be worked around by passing a lambda that returns a Future instead:
```c++
co_await Choose()
.When(future1, [](Void& const) {
// do something
})
.When([](){ return foo() }, [](Foo const& f) {
// do something else
}).Run();
```
The implementation of `When` will guarantee that this lambda will only be executed if all previous
`When` calls didn't receive a ready future.
## Waiting in When-Blocks
In FDB we sometimes see this pattern:
```c++
loop {
choose {
when(RequestA req = waitNext(requestAStream.getFuture())) {
wait(handleRequestA(req));
}
when(RequestB req = waitNext(requestBStream.getFuture())) {
wait(handleRequestb(req));
}
//...
}
}
```
This is not possible to do with `Choose`. However, this is done deliberately as the above is
considered an antipattern: This means that we can't serve two requests concurrently since the loop
won't execute until the request has been served. Instead, this should be written like this:
```c++
state ActorCollection actors(false);
loop {
choose {
when(RequestA req = waitNext(requestAStream.getFuture())) {
actors.add(handleRequestA(req));
}
when(RequestB req = waitNext(requestBStream.getFuture())) {
actors.add(handleRequestb(req));
}
//...
when(wait(actors.getResult())) {
// this only makes sure that errors are thrown correctly
UNREACHABLE();
}
}
}
```
And so the above can easily be rewritten using `Choose`:
```c++
ActorCollection actors(false);
loop {
co_await Choose()
.When(requestAStream.getFuture(), [&actors](RequestA const& req) {
actors.add(handleRequestA(req));
})
.When(requestBStream.getFuture(), [&actors](RequestB const& req) {
actors.add(handleRequestB(req));
})
.When(actors.getResult(), [](Void const&) {
UNREACHABLE();
}).run();
}
```
However, often using `choose`-`when` (or `Choose().When`) is overkill and other facilities like `quorum` and
`operator||` should be used instead. For example this:
```c++
choose {
when(R res = wait(f1)) {
return res;
}
when(wait(timeout(...))) {
throw io_timeout();
}
}
```
Should be written like this:
```c++
co_await (f1 || timeout(...));
if (f1.isReady()) {
co_return f1.get();
}
throw io_timeout();
```
(The above could also be packed into a helper function in `genericactors.actor.h`).
## Generators
With C++ coroutines we introduce two new basic types in Flow: `Generator<T>` and `AsyncGenerator<T>`. A generator is a
special type of coroutine, which can return multiple values.
`Generator<T>` and `AsyncGenerator<T>` implement a different interface and serve a very different purpose.
`Generator<T>` conforms to the `input_iterator` trait -- so it can be used like a normal iterator (with the exception
that copying the iterator has a different semantics). This also means that it can be used with the new `ranges`
library in STL which was introduced in C++20.
`AsyncGenerator<T>` implements the `()` operator which returns a new value every time it is called. However, this value
HAS to be waited for (dropping it and attempting to call `()` again will result in undefined behavior!). This semantic
difference allows an author to mix `co_await` and `co_yield` statements in a coroutine returning `AsyncGenerator<T>`.
Since generators can produce infinitely long streams, they can be useful to use in places where we'd otherwise use a
more complex in-line loop. For example, consider the code in `masterserver.actor.cpp` that is responsible generate
version numbers. The logic for this code is currently in a long function. With a `Generator<T>` it can be isolated to
one simple coroutine (which can be a direct member of `MasterData`). A simplified version of such a generator could
look as follows:
```c++
Generator<Version> MasterData::versionGenerator() {
auto prevVersion = lastEpochEnd;
auto lastVersionTime = now();
while (true) {
auto t1 = now();
Version toAdd =
std::max<Version>(1,
std::min<Version>(SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS,
SERVER_KNOBS->VERSIONS_PER_SECOND * (t1 - self->lastVersionTime)));
lastVersionTime = t1;
co_yield prevVersion + toAdd;
prevVersion += toAdd;
}
}
```
Now that the logic to compute versions is separated, `MasterData` can simply create an instance of `Generator<Version>`
by calling `auto vGenerator = MasterData::versionGenerator();` (and possibly storing that as a class member). It can
then access the current version by calling `*vGenerator` and go to the next generator by incrementing the iterator
(`++vGenerator`).
`AsyncGenerator<T>` should be used in some places where we used promise streams before (though not all of them, this
topic is discussed a bit later). For example:
```c++
template <class T, class F>
AsyncGenerator<T> filter(AsyncGenerator<T> gen, F pred) {
while (gen) {
auto val = co_await gen();
if (pred(val)) {
co_yield val;
}
}
}
```
Note how much simpler this function is compared to the old flow function:
```c++
ACTOR template <class T, class F>
Future<Void> filter(FutureStream<T> input, F pred, PromiseStream<T> output) {
loop {
try {
T nextInput = waitNext(input);
if (pred(nextInput))
output.send(nextInput);
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
} else
throw;
}
}
output.sendError(end_of_stream());
return Void();
}
```
A `FutureStream` can be converted into an `AsyncGenerator` by using a simple helper function:
```c++
template <class T>
AsyncGenerator<T> toGenerator(FutureStream<T> stream) {
loop {
try {
co_yield co_await stream;
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
co_return;
}
throw;
}
}
}
```
### Generators and Ranges
`Generator<T>` can be used like an input iterator. This means, that it can also be used with `std::ranges`. Consider
the following coroutine:
```c++
// returns base^0, base^1, base^2, ...
Generator<double> powersOf(double base) {
double curr = 1;
loop {
co_yield curr;
curr *= base;
}
}
```
We can use this now to generate views. For example:
```c++
for (auto v : generatorRange(powersOf(2))
| std::ranges::views::filter([](auto v) { return v > 10; })
| std::ranges::views::take(10)) {
fmt::print("{}\n", v);
}
```
The above would print all powers of two between 10 and 2^10.
### Eager vs Lazy Execution
One major difference between async generators and tasks (coroutines returning only one value through `Future`) is the
execution policy: An async generator will immediately suspend when it is called while a task will immediately start
execution and needs to be explicitly scheduled.
This is a conscious design decision. Lazy execution makes it much simpler to reason about memory ownership. For example,
the following is ok:
```c++
Generator<StringRef> randomStrings(int minLen, int maxLen) {
Arena arena;
auto buffer = new (arena) uint8_t[maxLen + 1];
while (true) {
auto sz = deterministicRandom()->randomInt(minLen, maxLen + 1);
for (int i = 0; i < sz; ++i) {
buffer[i] = deterministicRandom()->randomAlphaNumeric();
}
co_yield StringRef(buffer, sz);
}
}
```
The above coroutine returns a stream of random strings. The memory is owned by the coroutine and so it always returns
a `StringRef` and then reuses the memory in the next iteration. This makes this generator very cheap to use, as it only
does one allocation in its lifetime. With eager execution, this would be much harder to write (and reason about): the
coroutine would immediately generate a string and then eagerly compute the next one when the string is retrieved.
However, in Flow a `co_yield` is guarantee to suspend the coroutine until the value was consumed (this is not generally
a guarantee with `co_yield` -- C++ coroutines give the implementor a great degree of freedom over decisions like this).
### Generators vs Promise Streams
Flow provides another mechanism to send streams of messages between actors: `PromiseStream<T>`. In fact,
`AsyncGenerator<T>` uses `PromiseStream<T>` internally. So when should one be used over the other?
As a general rule of thumb: whenever possible, use `Generator<T>`, if not, use `AsyncGenerator<T>` if in doubt.
For pure computation it almost never makes sense to use a `PromiseStream<T>` (the only exception is if computation
can be expensive enough that `co_await yield()` becomes necessary). `Generator<T>` is more lightweight and therefore
usually more efficient. It is also easier to use.
When it comes to IO it becomes a bit more tricky. Assume we want to scan a file on disk, and we want to read it in
4k blocks. This can be done quite elegantly using a coroutine:
```c++
AsyncGenerator<Standalone<StringRef>> blockScanner(Reference<IAsyncFile> file) {
auto sz = co_await file->size();
decltype(sz) offset = 0;
constexpr decltype(sz) blockSize = 4*1024;
while (offset < sz) {
Arena arena;
auto block = new (arena) int8_t[blockSize];
auto toRead = std::min(sz - offset, blockSize);
auto r = co_await file->read(block, toRead, offset);
co_yield Standalone<StringRef>(StringRef(block, r), arena);
offset += r;
}
}
```
The problem with the above generator though, is that we only start reading when the generator is invoked. If consuming
the block takes sometimes a long time (for example because it has to be written somewhere), each call will take as long
as the disk latency is for a read.
What if we want to hide this latency? In other words: what if we want to improve throughput and end-to-end latency by
prefetching?
Doing this with a generator, while not trivial, is possible. But here it might be easier to use a `PromiseStream`
(we can even reuse the above generator):
```c++
Future<Void> blockScannerWithPrefetch(Reference<IAsyncFile> file,
PromiseStream<Standalone<StringRef> promise,
FlowLock lock) {
auto generator = blockScanner(file);
while (generator) {
{
FlowLock::Releaser _(co_await lock.take());
try {
promise.send(co_await generator());
} catch (Error& e) {
promise.sendError(e);
co_return;
}
}
// give caller opportunity to take the lock
co_await yield();
}
}
```
With the above the caller can control the prefetching dynamically by taking the lock if the queue becomes too full.
## Uncancellable
By default, a coroutine runs until it is either done (reaches the end of the function body, a `co_return` statement,
or throws an exception) or the last `Future<T>` object referencing that object is being dropped. The second use-case is
implemented as follows:
1. When the future count of a coroutine goes to `0`, the coroutine is immediately resumed and `actor_cancelled` is
thrown within that coroutine (this allows the coroutine to do some cleanup work).
2. Any attempt to run `co_await expr` will immediately throw `actor_cancelled`.
However, some coroutines aren't safe to be cancelled. This usually concerns disk IO operations. With `ACTOR` we could
either have a return-type `void` or use the `UNCANCELLABLE` keyword to change this behavior: in this case, calling
`Future<T>::cancel()` would be a no-op and dropping all futures wouldn't cause cancellation.
However, with C++ coroutines, this won't work:
* We can't introduce new keywords in pure C++ (so `UNCANCELLABLE` would require some preprocessing).
* Implementing a `promise_type` for `void` isn't a good idea, as this would make any `void`-function potentially a
coroutine.
However, this can also be seen as an opportunity: uncancellable actors are always a bit tricky to use, since we need to
make sure that the caller keeps all memory alive that the uncancellable coroutine might reference until it is done.
Because of that, whenever someone calls a coroutine, they need to be extra careful. However, someone might not know that
the coroutine they call is uncancellable.
We address this problem with the following definition:
---
*Definition*:
A coroutine is uncancellable if the first argument (or the second, if the coroutine is a class-member) is of type
`Uncancellable`
---
The definition of `Uncancellable` is trivial: `struct Uncancellable {};` -- it is simply used as a marker. So now, if
a user calls an uncancellable coroutine, it will be obvious on the caller side. For example the following is *never*
uncancellable:
```c++
co_await foo();
```
But this one is:
```c++
co_await bar(Uncancellable());
```
## Porting `ACTOR`'s to C++ Coroutines
If you have an existing `ACTOR`, you can port it to a C++ coroutine by following these steps:
1. Remove `ACTOR` keyword.
2. If the actor is marked with `UNCANCELLABLE`, remove it and make the first argument `Uncancellable`. If the return
type of the actor is `void` make it `Future<Void>` instead and add an `Uncancellable` as the first argument.
3. Remove all `state` modifiers from local variables.
4. Replace all `wait(expr)` with `co_await expr`.
5. Remove all `waitNext(expr)` with `co_await expr`.
6. Rewrite existing `choose-when` statements using the `Choose` class.
In addition, the following things should be looked out for:
### Lifetime of locals
Consider this code:
```c++
Local foo;
wait(bar());
...
```
`foo` will be destroyed right after the `wait`-expression. However, after making this a coroutine:
```c++
Local foo;
co_await bar();
...
```
`foo` will stay alive until we leave the scope. This is better (as it is more intuitive and follows standard C++), but
in some weird corner-cases code might depend on the semantic that locals get destroyed when we call into `wait`. Look
out for things where destructors do semantically important work (like in `FlowLock::Releaser`).
### Unnecessary Helper Actors
In `flow/genericactors.actor.h` we have a number of useful helpers. Some of them are also useful with C++ coroutines,
others add unnecessary overhead. Look out for those and remove calls to it. The most important ones are `success` and
`store`.
```c++
wait(success(f));
```
becomes
```c++
co_await f;
```
and
```c++
wait(store(v, f));
```
becomes
```c++
v = co_await f;
```
### Replace Locals with Temporaries
In certain places we use locals just to work around actor compiler limitations. Since locals use up space in the
coroutine object they should be removed wherever it makes sense (only if it doesn't make the code less readable!).
For example:
```c++
Foo f = wait(foo);
bar(f);
```
might become
```c++
bar(co_await foo);
```
### Don't Wait in Error-Handlers
Using `co_await` in an error-handler produces a compilation error in C++. However, this was legal with `ACTOR`. There
is no general best way of addressing this issue, but usually it's quite easy to move the `co_await` expression out of
the `catch`-block.
One place where we use this pattern a lot if in our transaction retry loop:
```c++
state ReadYourWritesTransaction tr(db);
loop {
try {
Value v = wait(tr.get(key));
tr.set(key2, val2);
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
```
Luckily, with coroutines, we can do one better: generalize the retry loop. The above could look like this:
```c++
co_await db.run([&](ReadYourWritesTransaction* tr) -> Future<Void> {
Value v = wait(tr.get(key));
tr.set(key2, val2);
wait(tr.commit());
});
```
A possible implementation of `Database::run` would be:
```c++
template <std:invocable<ReadYourWritesTransaction*> Fun>
Future<Void> Database::run(Fun fun) {
ReadYourWritesTransaction tr(*this);
Future<Void> onError;
while (true) {
if (onError.isValid()) {
co_await onError;
onError = Future<Void>();
}
try {
co_await fun(&tr);
} catch (Error& e) {
onError = tr.onError(e);
}
}
}
```
### Make Static Functions Class Members
With actors, we often see the following pattern:
```c++
struct Foo : IFoo {
ACTOR static Future<Void> bar(Foo* self) {
// use `self` here to access members of `Foo`
}
Future<Void> bar() override {
return bar(this);
}
};
```
This boilerplate is necessary, because `ACTOR`s can't be class members: the actor compiler will generate another
`struct` and move the code there -- so `this` will point to the actor state and not to the class instance.
With C++ coroutines, this limitation goes away. So a cleaner (and slightly more efficient) implementation of the above
is:
```c++
struct Foo : IFoo {
Future<Void> bar() override {
// `this` can be used like in any non-coroutine. `co_await` can be used.
}
};
```
### Initialization of Locals
There is one very subtle and hard to spot difference between `ACTOR` and a coroutine: the way some local variables are
initialized. Consider the following code:
```c++
struct SomeStruct {
int a;
bool b;
};
ACTOR Future<Void> someActor() {
// beginning of body
state SomeStruct someStruct;
// rest of body
}
```
For state variables, the actor-compiler generates the following code to initialize `SomeStruct someStruct`:
```c++
someStruct = SomeStruct();
```
This, however, is different from what might expect since now the default constructor is explicitly called. This means
if the code is translated to:
```c++
Future<Void> someActor() {
// beginning of body
SomeStruct someStruct;
// rest of body
}
```
initialization will be different. The exact equivalent instead would be something like this:
```c++
Future<Void> someActor() {
// beginning of body
SomeStruct someStruct{}; // auto someStruct = SomeStruct();
// rest of body
}
```
If the struct `SomeStruct` would initialize its primitive members explicitly (for example by using `int a = 0;` and
`bool b = false`) this would be a non-issue. And explicit initialization is probably the right fix here. Sadly, it
doesn't seem like UBSAN finds these kind of subtle bugs.
Another difference is, that if a `state` variables might be initialized twice: once at the creation of the actor using
the default constructor and a second time at the point where the variable is initialized in the code. With C++
coroutines we now get the expected behavior, which is better, but nonetheless a potential behavior change.

View File

@ -1,4 +1,5 @@
add_subdirectory(tutorial)
add_subdirectory(coro_tutorial)
set(SPHINX_DOCUMENT_DIR "${CMAKE_SOURCE_DIR}/documentation/sphinx")

View File

@ -0,0 +1,2 @@
add_flow_target(EXECUTABLE NAME coro_tutorial SRCS tutorial.cpp)
target_link_libraries(coro_tutorial PUBLIC fdbclient)

View File

@ -0,0 +1,714 @@
/*
* tutorial.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fmt/format.h"
#include "flow/flow.h"
#include "flow/Platform.h"
#include "flow/DeterministicRandom.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/TLSConfig.actor.h"
#include "fdbrpc/Net2FileSystem.h"
#include <functional>
#include <unordered_map>
#include <memory>
#include <iostream>
using namespace std::literals::string_literals;
using namespace std::literals::string_view_literals;
NetworkAddress serverAddress;
enum TutorialWellKnownEndpoints {
WLTOKEN_SIMPLE_KV_SERVER = WLTOKEN_FIRST_AVAILABLE,
WLTOKEN_ECHO_SERVER,
WLTOKEN_COUNT_IN_TUTORIAL
};
// this is a simple actor that will report how long
// it is already running once a second.
Future<Void> simpleTimer() {
// we need to remember the time when we first
// started.
double start_time = g_network->now();
loop {
co_await delay(1.0);
std::cout << format("Time: %.2f\n", g_network->now() - start_time);
}
}
// A actor that demonstrates how choose-when
// blocks work.
Future<Void> someFuture(Future<int> ready) {
// loop choose {} works as well here - the braces are optional
loop {
co_await Choose()
.When(delay(0.5), [](Void const&) { std::cout << "Still waiting...\n"; })
.When(ready, [](int const& r) { std::cout << format("Ready %d\n", r); })
.run();
}
}
Future<Void> promiseDemo() {
Promise<int> promise;
Future<Void> f = someFuture(promise.getFuture());
co_await delay(3.0);
promise.send(2);
co_await f;
}
Future<Void> eventLoop(AsyncTrigger* trigger) {
loop {
co_await Choose()
.When(delay(0.5), [](Void const&) { std::cout << "Still waiting...\n"; })
.When(trigger->onTrigger(), [](Void const&) { std::cout << "Triggered!\n"; })
.run();
}
}
Future<Void> triggerDemo() {
int runs = 1;
AsyncTrigger trigger;
auto triggerLoop = eventLoop(&trigger);
while (++runs < 10) {
co_await delay(1.0);
std::cout << "trigger..";
trigger.trigger();
}
std::cout << "Done.";
}
struct EchoServerInterface {
constexpr static FileIdentifier file_identifier = 3152015;
RequestStream<struct GetInterfaceRequest> getInterface;
RequestStream<struct EchoRequest> echo;
RequestStream<struct ReverseRequest> reverse;
RequestStream<struct StreamRequest> stream;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, echo, reverse, stream);
}
};
struct GetInterfaceRequest {
constexpr static FileIdentifier file_identifier = 12004156;
ReplyPromise<EchoServerInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct EchoRequest {
constexpr static FileIdentifier file_identifier = 10624019;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
struct ReverseRequest {
constexpr static FileIdentifier file_identifier = 10765955;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
struct StreamReply : ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 440804;
int index = 0;
StreamReply() = default;
explicit StreamReply(int index) : index(index) {}
size_t expectedSize() const { return 2e6; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index);
}
};
struct StreamRequest {
constexpr static FileIdentifier file_identifier = 5410805;
ReplyPromiseStream<StreamReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
uint64_t tokenCounter = 1;
Future<Void> echoServer() {
EchoServerInterface echoServer;
echoServer.getInterface.makeWellKnownEndpoint(WLTOKEN_ECHO_SERVER, TaskPriority::DefaultEndpoint);
ActorCollection requests;
loop {
try {
co_await Choose()
.When(requests.getResult(),
[](Void const&) {
// An actor collection with no constructor arguments or `false` as it's constructor argument
// will never finish. However, `getResult` will throw if any of the Futures we pass to it
// throw. So we have to wait on it, but we can assert that it either throws or is never ready
UNREACHABLE();
})
.When(echoServer.getInterface.getFuture(),
[&echoServer](GetInterfaceRequest const& req) { req.reply.send(echoServer); })
.When(echoServer.echo.getFuture(), [](EchoRequest const& req) { req.reply.send(req.message); })
.When(echoServer.reverse.getFuture(),
[](ReverseRequest const& req) {
req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
})
.When(echoServer.stream.getFuture(),
[&requests](StreamRequest const& req) {
requests.add([](StreamRequest req) -> Future<Void> {
req.reply.setByteLimit(1024);
int i = 0;
for (; i < 100; ++i) {
co_await req.reply.onReady();
std::cout << "Send " << i << std::endl;
req.reply.send(StreamReply{ i });
}
req.reply.sendError(end_of_stream());
}(req));
})
.run();
} catch (Error& e) {
if (e.code() != error_code_operation_obsolete) {
fprintf(stderr, "Error: %s\n", e.what());
throw e;
}
}
}
}
Future<Void> echoClient() {
EchoServerInterface server;
server.getInterface =
RequestStream<GetInterfaceRequest>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_ECHO_SERVER));
server = co_await server.getInterface.getReply(GetInterfaceRequest());
EchoRequest echoRequest;
echoRequest.message = "Hello World";
std::string echoMessage = co_await server.echo.getReply(echoRequest);
std::cout << format("Sent %s to echo, received %s\n", "Hello World", echoMessage.c_str());
ReverseRequest reverseRequest;
reverseRequest.message = "Hello World";
std::string reverseString = co_await server.reverse.getReply(reverseRequest);
std::cout << format("Sent %s to reverse, received %s\n", "Hello World", reverseString.c_str());
ReplyPromiseStream<StreamReply> stream = server.stream.getReplyStream(StreamRequest{});
int j = 0;
try {
loop {
StreamReply rep = co_await stream.getFuture();
std::cout << "Rep: " << rep.index << std::endl;
ASSERT(rep.index == j++);
}
} catch (Error& e) {
ASSERT(e.code() == error_code_end_of_stream || e.code() == error_code_connection_failed);
}
}
struct SimpleKeyValueStoreInterface {
constexpr static FileIdentifier file_identifier = 8226647;
RequestStream<struct GetKVInterface> connect;
RequestStream<struct GetRequest> get;
RequestStream<struct SetRequest> set;
RequestStream<struct ClearRequest> clear;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, connect, get, set, clear);
}
};
struct GetKVInterface {
constexpr static FileIdentifier file_identifier = 8062308;
ReplyPromise<SimpleKeyValueStoreInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct GetRequest {
constexpr static FileIdentifier file_identifier = 6983506;
std::string key;
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, reply);
}
};
struct SetRequest {
constexpr static FileIdentifier file_identifier = 7554186;
std::string key;
std::string value;
ReplyPromise<Void> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, value, reply);
}
};
struct ClearRequest {
constexpr static FileIdentifier file_identifier = 8500026;
std::string from;
std::string to;
ReplyPromise<Void> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, from, to, reply);
}
};
Future<Void> kvStoreServer() {
SimpleKeyValueStoreInterface inf;
std::map<std::string, std::string> store;
inf.connect.makeWellKnownEndpoint(WLTOKEN_SIMPLE_KV_SERVER, TaskPriority::DefaultEndpoint);
loop {
co_await Choose()
.When(inf.connect.getFuture(),
[&inf](GetKVInterface const& req) {
std::cout << "Received connection attempt\n";
req.reply.send(inf);
})
.When(inf.get.getFuture(),
[&store](GetRequest const& req) {
auto iter = store.find(req.key);
if (iter == store.end()) {
req.reply.sendError(io_error());
} else {
req.reply.send(iter->second);
}
})
.When(inf.set.getFuture(),
[&store](SetRequest const& req) {
store[req.key] = req.value;
req.reply.send(Void());
})
.When(inf.clear.getFuture(),
[&store](ClearRequest const& req) {
auto from = store.lower_bound(req.from);
auto to = store.lower_bound(req.to);
while (from != store.end() && from != to) {
auto next = from;
++next;
store.erase(from);
from = next;
}
req.reply.send(Void());
})
.run();
}
}
Future<SimpleKeyValueStoreInterface> connect() {
std::cout << format("%llu: Connect...\n", uint64_t(g_network->now()));
SimpleKeyValueStoreInterface c;
c.connect = RequestStream<GetKVInterface>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_SIMPLE_KV_SERVER));
SimpleKeyValueStoreInterface result = co_await c.connect.getReply(GetKVInterface());
std::cout << format("%llu: done..\n", uint64_t(g_network->now()));
co_return result;
}
Future<Void> kvSimpleClient() {
SimpleKeyValueStoreInterface server = co_await connect();
std::cout << format("Set %s -> %s\n", "foo", "bar");
SetRequest setRequest;
setRequest.key = "foo";
setRequest.value = "bar";
co_await server.set.getReply(setRequest);
GetRequest getRequest;
getRequest.key = "foo";
std::string value = co_await server.get.getReply(getRequest);
std::cout << format("get(%s) -> %s\n", "foo", value.c_str());
}
Future<Void> kvClient(SimpleKeyValueStoreInterface server, std::shared_ptr<uint64_t> ops) {
auto timeout = delay(20);
int rangeSize = 2 << 12;
loop {
SetRequest setRequest;
setRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize));
setRequest.value = "foo";
co_await server.set.getReply(setRequest);
++(*ops);
try {
GetRequest getRequest;
getRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize));
co_await server.get.getReply(getRequest);
++(*ops);
} catch (Error& e) {
if (e.code() != error_code_io_error) {
throw e;
}
}
int from = deterministicRandom()->randomInt(0, rangeSize);
ClearRequest clearRequest;
clearRequest.from = std::to_string(from);
clearRequest.to = std::to_string(from + 100);
co_await server.clear.getReply(clearRequest);
++(*ops);
if (timeout.isReady()) {
// we are done
co_return;
}
}
}
Future<Void> throughputMeasurement(std::shared_ptr<uint64_t> operations) {
loop {
co_await delay(1.0);
std::cout << format("%llu op/s\n", *operations);
*operations = 0;
}
}
Future<Void> multipleClients() {
SimpleKeyValueStoreInterface server = co_await connect();
auto ops = std::make_shared<uint64_t>(0);
std::vector<Future<Void>> clients(100);
for (auto& f : clients) {
f = kvClient(server, ops);
}
auto done = waitForAll(clients);
co_await (done || throughputMeasurement(ops));
co_return;
}
std::string clusterFile = "fdb.cluster";
Future<Void> logThroughput(int64_t* v, Key* next) {
loop {
int64_t last = *v;
co_await delay(1);
fmt::print("throughput: {} bytes/s, next: {}\n", *v - last, printable(*next).c_str());
}
}
Future<Void> fdbClientStream() {
Database db = Database::createDatabase(clusterFile, 300);
Transaction tx(db);
Key next;
int64_t bytes = 0;
Future<Void> logFuture = logThroughput(&bytes, &next);
loop {
Future<Void> onError;
PromiseStream<Standalone<RangeResultRef>> results;
try {
Future<Void> stream = tx.getRangeStream(results,
KeySelector(firstGreaterOrEqual(next), next.arena()),
KeySelector(firstGreaterOrEqual(normalKeys.end)),
GetRangeLimits());
loop {
Standalone<RangeResultRef> range = co_await results.getFuture();
if (range.size()) {
bytes += range.expectedSize();
next = keyAfter(range.back().key);
}
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
}
onError = tx.onError(e);
}
co_await onError;
}
}
bool transactionDone(std::convertible_to<bool> auto v) {
return v;
}
bool transaction_done(void) {
return true;
}
template <class DB, class Fun>
Future<Void> runTransactionWhile(DB const& db, Fun f) {
Transaction tr(db);
loop {
Future<Void> onError;
try {
if (transactionDone(co_await f(&tr))) {
co_return;
}
} catch (Error& e) {
onError = tr.onError(e);
}
co_await onError;
}
}
template <class DB, class Fun>
Future<Void> runTransaction(DB const& db, Fun f) {
return runTransactionWhile(db, [&f](Transaction* tr) -> Future<bool> {
co_await f(tr);
co_return true;
});
}
template <class DB, class Fun>
Future<Void> runRYWTransaction(DB const& db, Fun f) {
Future<Void> onError;
ReadYourWritesTransaction tr(db);
loop {
if (onError.isValid()) {
co_await onError;
onError = Future<Void>();
}
try {
co_await f(&tr);
co_return;
} catch (Error& e) {
onError = tr.onError(e);
}
}
}
Future<Void> fdbClientGetRange() {
Database db = Database::createDatabase(clusterFile, 300);
Transaction tx(db);
Key next;
int64_t bytes = 0;
Future<Void> logFuture = logThroughput(&bytes, &next);
co_await runTransactionWhile(db, [&bytes, &next](Transaction* tr) -> Future<bool> {
RangeResult range =
co_await tr->getRange(KeySelector(firstGreaterOrEqual(next), next.arena()),
KeySelector(firstGreaterOrEqual(normalKeys.end)),
GetRangeLimits(GetRangeLimits::ROW_LIMIT_UNLIMITED, CLIENT_KNOBS->REPLY_BYTE_LIMIT));
bytes += range.expectedSize();
if (!range.more) {
co_return true;
}
next = keyAfter(range.back().key);
co_return false;
});
co_return;
}
Future<Void> fdbClient() {
co_await delay(30);
Database db = Database::createDatabase(clusterFile, 300);
std::string keyPrefix = "/tut/";
Key startKey;
KeyRef endKey = "/tut0"_sr;
int beginIdx = 0;
loop {
co_await runTransaction(db, [&](Transaction* tr) -> Future<Void> {
// this workload is stupidly simple:
// 1. select a random key between 1
// and 1e8
// 2. select this key plus the 100
// next ones
// 3. write 10 values in [k, k+100]
beginIdx = deterministicRandom()->randomInt(0, 1e8 - 100);
startKey = keyPrefix + std::to_string(beginIdx);
auto range = co_await tr->getRange(KeyRangeRef(startKey, endKey), 100);
for (int i = 0; i < 10; ++i) {
Key k = Key(keyPrefix + std::to_string(beginIdx + deterministicRandom()->randomInt(0, 100)));
tr->set(k, "foo"_sr);
}
co_await tr->commit();
std::cout << "Committed\n";
co_await delay(2.0);
co_return;
});
}
}
Future<Void> fdbStatusStresser() {
Database db = Database::createDatabase(clusterFile, 300);
Key statusJson(std::string("\xff\xff/status/json"));
loop {
co_await runRYWTransaction(db, [&statusJson](ReadYourWritesTransaction* tr) -> Future<Void> {
co_await tr->get(statusJson);
co_return;
});
}
}
AsyncGenerator<Optional<StringRef>> readBlocks(Reference<IAsyncFile> file, int64_t blockSize) {
auto sz = co_await file->size();
decltype(sz) offset = 0;
Arena arena;
auto block = new (arena) uint8_t[blockSize];
while (offset < sz) {
auto read = co_await file->read(block, blockSize, offset);
offset += read;
co_yield StringRef(block, read);
}
while (true) {
co_yield Optional<StringRef>{};
}
}
AsyncGenerator<Optional<StringRef>> readLines(Reference<IAsyncFile> file) {
auto blocks = readBlocks(file, 4 * 1024);
Arena arena;
StringRef lastLine;
loop {
auto optionalBlock = co_await blocks();
if (!optionalBlock.present()) {
if (lastLine.empty()) {
co_yield Optional<StringRef>{};
} else {
co_yield lastLine;
lastLine = {};
arena = Arena();
co_return;
}
}
StringRef block = optionalBlock.get();
auto endsWithNewLine = block.back() == uint8_t('\n');
while (!block.empty()) {
if (!lastLine.empty()) [[unlikely]] {
concatenateStrings(arena, lastLine, block.eatAny("\n"_sr, nullptr));
if (!block.empty() || endsWithNewLine) {
co_yield lastLine;
lastLine = StringRef();
arena = Arena();
}
} else {
auto line = block.eatAny("\n"_sr, nullptr);
if (block.empty() && !endsWithNewLine) {
lastLine = StringRef(arena, line);
} else {
co_yield line;
}
}
}
}
}
Future<Void> testReadLines() {
auto path = "/etc/hosts"s;
auto file = co_await IAsyncFileSystem::filesystem()->open(path, IAsyncFile::OPEN_READWRITE, 0640);
auto lines = readLines(file);
for (int i = 0; true; ++i) {
auto line = co_await lines();
if (!line.present()) {
break;
}
fmt::print("{}: {}\n", i, line.get());
}
}
// readLines -> Stream of lines of a text file
std::unordered_map<std::string, std::function<Future<Void>()>> actors = {
{ "timer", &simpleTimer }, // ./tutorial timer
{ "promiseDemo", &promiseDemo }, // ./tutorial promiseDemo
{ "triggerDemo", &triggerDemo }, // ./tutorial triggerDemo
{ "echoServer", &echoServer }, // ./tutorial -p 6666 echoServer
{ "echoClient", &echoClient }, // ./tutorial -s 127.0.0.1:6666 echoClient
{ "kvStoreServer", &kvStoreServer }, // ./tutorial -p 6666 kvStoreServer
{ "kvSimpleClient", &kvSimpleClient }, // ./tutorial -s 127.0.0.1:6666 kvSimpleClient
{ "multipleClients", &multipleClients }, // ./tutorial -s 127.0.0.1:6666 multipleClients
{ "fdbClientStream", &fdbClientStream }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClientStream
{ "fdbClientGetRange", &fdbClientGetRange }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClientGetRange
{ "fdbClient", &fdbClient }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClient
{ "fdbStatusStresser", &fdbStatusStresser },
{ "testReadLines", &testReadLines }
}; // ./tutorial -C $CLUSTER_FILE_PATH fdbStatusStresser
int main(int argc, char* argv[]) {
bool isServer = false;
std::string port;
std::vector<std::function<Future<Void>()>> toRun;
// parse arguments
for (int i = 1; i < argc; ++i) {
std::string arg(argv[i]);
if (arg == "-p") {
isServer = true;
if (i + 1 >= argc) {
std::cout << "Expecting an argument after -p\n";
return 1;
}
port = std::string(argv[++i]);
continue;
} else if (arg == "-s") {
if (i + 1 >= argc) {
std::cout << "Expecting an argument after -s\n";
return 1;
}
serverAddress = NetworkAddress::parse(argv[++i]);
continue;
} else if (arg == "-C") {
clusterFile = argv[++i];
std::cout << "Using cluster file " << clusterFile << std::endl;
continue;
}
auto actor = actors.find(arg);
if (actor == actors.end()) {
std::cout << format("Error: actor %s does not exist\n", arg.c_str());
return 1;
}
toRun.push_back(actor->second);
}
platformInit();
g_network = newNet2(TLSConfig(), false, true);
FlowTransport::createInstance(!isServer, 0, WLTOKEN_COUNT_IN_TUTORIAL);
NetworkAddress publicAddress = NetworkAddress::parse("0.0.0.0:0");
if (isServer) {
publicAddress = NetworkAddress::parse("0.0.0.0:" + port);
}
try {
if (isServer) {
auto listenError = FlowTransport::transport().bind(publicAddress, publicAddress);
if (listenError.isError()) {
listenError.get();
}
}
} catch (Error& e) {
std::cout << format("Error while binding to address (%d): %s\n", e.code(), e.what());
}
Net2FileSystem::newFileSystem(-10, "");
// now we start the actors
std::vector<Future<Void>> all;
all.reserve(toRun.size());
for (auto& f : toRun) {
all.emplace_back(f());
}
auto f = stopAfter(waitForAll(all));
g_network->run();
return 0;
}

1914
fdbrpc/CoroTests.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@
void forceLinkIndexedSetTests();
void forceLinkDequeTests();
void forceLinkFlowTests();
void forceLinkCoroTests();
void forceLinkVersionedMapTests();
void forceLinkMemcpyTests();
void forceLinkMemcpyPerfTests();
@ -95,6 +96,7 @@ struct UnitTestWorkload : TestWorkload {
forceLinkIndexedSetTests();
forceLinkDequeTests();
forceLinkFlowTests();
forceLinkCoroTests();
forceLinkVersionedMapTests();
forceLinkMemcpyTests();
forceLinkMemcpyPerfTests();

View File

@ -124,6 +124,8 @@ foreach(ft flow flow_sampling flowlinktest)
find_library(CORE_FOUNDATION CoreFoundation)
target_link_libraries(${ft} PRIVATE ${IO_KIT} ${CORE_FOUNDATION})
endif()
find_package(Coroutines COMPONENTS Experimental Final REQUIRED)
target_link_libraries(${ft} PUBLIC std::coroutines)
endforeach()
if(OPEN_FOR_IDE)

View File

@ -722,6 +722,40 @@ inline static uint8_t* mutateString(StringRef& s) {
return const_cast<uint8_t*>(s.begin());
}
template <class... StringRefType>
static Standalone<StringRef> concatenateStrings(StringRefType... strs) {
int totalSize = 0;
for (auto const& s : { strs... }) {
totalSize += s.size();
}
Standalone<StringRef> str = makeString(totalSize);
uint8_t* buf = mutateString(str);
for (auto const& s : { strs... }) {
buf = s.copyTo(buf);
}
return str;
}
template <class... StringRefType>
static StringRef concatenateStrings(Arena& arena, StringRefType... strs) {
int totalSize = 0;
for (auto const& s : { strs... }) {
totalSize += s.size();
}
StringRef str = makeString(totalSize, arena);
uint8_t* buf = mutateString(str);
for (auto const& s : { strs... }) {
buf = s.copyTo(buf);
}
return str;
}
template <class Archive>
inline void load(Archive& ar, StringRef& value) {
uint32_t length;

View File

@ -0,0 +1,240 @@
/*
* CoroUtils.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_COROUTILS_H
#define FLOW_COROUTILS_H
#include "flow/flow.h"
namespace coro {
template <class Parent, int Idx, class F>
using ConditionalActorCallback = std::conditional_t<GetFutureTypeV<F> == FutureType::Future,
ActorCallback<Parent, Idx, FutureReturnTypeT<F>>,
ActorSingleCallback<Parent, Idx, FutureReturnTypeT<F>>>;
template <class Parent, int Idx, class... Args>
struct ChooseImplCallback;
template <class Parent, int Idx, class F, class... Args>
struct ChooseImplCallback<Parent, Idx, F, Args...>
: ConditionalActorCallback<ChooseImplCallback<Parent, Idx, F, Args...>, Idx, F>,
ChooseImplCallback<Parent, Idx + 1, Args...> {
using ThisCallback = ConditionalActorCallback<ChooseImplCallback<Parent, Idx, F, Args...>, Idx, F>;
using ValueType = FutureReturnTypeT<F>;
static constexpr FutureType futureType = GetFutureTypeV<F>;
[[nodiscard]] Parent* getParent() { return static_cast<Parent*>(this); }
void registerCallbacks() {
if constexpr (futureType == FutureType::Future) {
StrictFuture<ValueType> sf = std::get<Idx>(getParent()->futures);
sf.addCallbackAndClear(static_cast<ThisCallback*>(this));
} else {
auto sf = std::get<Idx>(getParent()->futures);
sf.addCallbackAndClear(static_cast<ThisCallback*>(this));
}
if constexpr (sizeof...(Args) > 0) {
ChooseImplCallback<Parent, Idx + 1, Args...>::registerCallbacks();
}
}
void a_callback_fire(ThisCallback*, ValueType const& value) {
getParent()->removeCallbacks();
try {
std::get<Idx>(getParent()->functions)(value);
getParent()->SAV<Void>::sendAndDelPromiseRef(Void());
} catch (Error& e) {
getParent()->SAV<Void>::sendErrorAndDelPromiseRef(e);
} catch (...) {
getParent()->SAV<Void>::sendErrorAndDelPromiseRef(unknown_error());
}
}
void a_callback_error(ThisCallback*, Error e) {
getParent()->removeCallbacks();
getParent()->SAV<Void>::sendErrorAndDelPromiseRef(e);
}
void removeCallbacks() {
ThisCallback::remove();
if constexpr (sizeof...(Args) > 0) {
ChooseImplCallback<Parent, Idx + 1, Args...>::removeCallbacks();
}
}
};
template <class Parent, int Idx>
struct ChooseImplCallback<Parent, Idx> {
#ifdef ENABLE_SAMPLING
LineageReference* lineageAddr() { return currentLineage; }
#endif
};
template <class... Args>
struct ChooseImplActor final : Actor<Void>,
ChooseImplCallback<ChooseImplActor<Args...>, 0, Args...>,
FastAllocated<ChooseImplActor<Args...>> {
std::tuple<Args...> futures;
std::tuple<std::function<void(FutureReturnTypeT<Args> const&)>...> functions;
using FastAllocated<ChooseImplActor<Args...>>::operator new;
using FastAllocated<ChooseImplActor<Args...>>::operator delete;
ChooseImplActor(std::tuple<Args...>&& futures,
std::tuple<std::function<void(FutureReturnTypeT<Args> const&)>...>&& functions)
: Actor<Void>(), futures(futures), functions(functions) {
ChooseImplCallback<ChooseImplActor<Args...>, 0, Args...>::registerCallbacks();
}
void cancel() override {
auto waitState = actor_wait_state;
actor_wait_state = -1;
if (waitState) {
ChooseImplCallback<ChooseImplActor<Args...>, 0, Args...>::removeCallbacks();
SAV<Void>::sendErrorAndDelPromiseRef(actor_cancelled());
}
}
void destroy() override { delete this; }
};
template <class... Args>
class ChooseClause {
std::tuple<Args...> futures;
std::tuple<std::function<void(FutureReturnTypeT<Args> const&)>...> functions;
bool noop = false;
template <class T, bool IsStream>
auto getNoop() {
using FType = std::conditional_t<IsStream, FutureStream<T>, Future<T>>;
return ChooseClause<Args..., FType>(std::tuple_cat(std::move(futures), std::make_tuple(FType())), true);
}
template <class T>
auto getNoop(const Future<T>& future) {
return ChooseClause<Args..., Future<T> const&>(
std::tuple_cat(std::move(futures), std::make_tuple(std::cref(future))), true);
}
template <class T>
auto getNoop(const FutureStream<T>& future) {
return ChooseClause<Args..., FutureStream<T> const&>(
std::tuple_cat(std::move(futures), std::make_tuple(std::cref(future))), true);
}
public:
ChooseClause(std::tuple<Args...>&& futures,
std::tuple<std::function<void(FutureReturnTypeT<Args> const&)>...>&& functions)
: futures(std::move(futures)), functions(std::move(functions)) {}
explicit ChooseClause(std::tuple<Args...>&& futures, bool noop = false) : futures(std::move(futures)), noop(noop) {}
explicit ChooseClause() : futures(std::tuple<>()) {}
auto When(std::invocable auto futureCallback, std::invocable<decltype(futureCallback().get())> auto fun) {
using FType = decltype(futureCallback());
using FReturnType = FutureReturnTypeT<FType>;
constexpr bool isStream = GetFutureTypeV<FType> == FutureType::FutureStream;
using ArgType = std::conditional_t<isStream, FutureStream<FReturnType>, Future<FReturnType>>;
if (noop) {
return getNoop<FReturnType, isStream>();
}
auto future = futureCallback();
if (future.isReady()) {
fun(future.get());
return getNoop<FReturnType, isStream>();
}
std::function<void(FReturnType const&)> function = fun;
return ChooseClause<Args..., ArgType>(std::tuple_cat(std::move(futures), std::make_tuple(future)),
std::tuple_cat(std::move(functions), std::make_tuple(function)));
}
template <class T>
auto When(Future<T> const& future, std::invocable<T const&> auto fun) {
static_assert(std::is_same_v<decltype(fun(std::declval<T const&>())), void>,
"When-handler must return void (and can't be awaitable)");
if (noop) {
return getNoop(future);
}
if (future.isReady()) {
fun(future.get());
return getNoop(future);
}
std::function<void(T const&)> function = fun;
return ChooseClause<Args..., Future<T> const&>(
std::tuple_cat(std::move(futures), std::make_tuple(std::cref(future))),
std::tuple_cat(std::move(functions), std::make_tuple(function)));
}
template <class T>
auto When(FutureStream<T> const& futureStream, std::invocable<T const&> auto fun) {
static_assert(std::is_same_v<decltype(fun(std::declval<T const&>())), void>,
"When-handler must return void (and't can't be awaitable)");
if (noop) {
return getNoop(futureStream);
}
if (futureStream.isReady()) {
auto fs = futureStream;
fun(fs.pop());
return getNoop(futureStream);
}
std::function<void(T const&)> function = fun;
return ChooseClause<Args..., FutureStream<T> const&>(
std::tuple_cat(std::move(futures), std::make_tuple(std::cref(futureStream))),
std::tuple_cat(std::move(functions), std::make_tuple(function)));
}
[[nodiscard]] Future<Void> run() {
if (noop) {
return Void();
}
return Future<Void>(new ChooseImplActor<Args...>(std::move(futures), std::move(functions)));
}
};
} // namespace coro
using Choose = coro::ChooseClause<>;
template <class T, class F>
AsyncGenerator<T> map(AsyncGenerator<T> gen, F pred) {
while (gen) {
auto val = co_await gen();
if (pred(val)) {
co_yield val;
}
}
}
template <class T>
AsyncGenerator<T> toGenerator(FutureStream<T> stream) {
while (true) {
try {
co_yield co_await stream;
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
co_return;
}
throw;
}
}
}
#endif // FLOW_COROUTILS_H

View File

@ -0,0 +1,167 @@
/*
* Coroutines.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_COROUTINES_H
#define FLOW_COROUTINES_H
#pragma once
#if __has_include(<coroutine>)
#include <coroutine>
namespace n_coroutine = ::std;
#elif __has_include(<experimental/coroutine>)
#include <experimental/coroutine>
namespace n_coroutine = ::std::experimental;
#endif
#include <concepts>
#include <array>
#include <cstring>
#include "flow/flow.h"
#include "flow/Error.h"
#include "flow/CoroutinesImpl.h"
struct Uncancellable {};
template <class T>
class AsyncGenerator {
PromiseStream<T>* nextPromise;
n_coroutine::coroutine_handle<> handle;
public:
explicit AsyncGenerator(PromiseStream<T>* promise, n_coroutine::coroutine_handle<> handle)
: nextPromise(promise), handle(handle) {}
~AsyncGenerator() { handle.destroy(); }
Future<T> operator()() {
handle.resume();
Error error;
T res;
try {
res = co_await nextPromise->getFuture();
} catch (Error& e) {
error = e;
} catch (...) {
error = unknown_error();
}
co_await delay(0);
if (error.isValid()) {
throw error;
}
co_return res;
}
explicit operator bool() { return !handle.done(); }
};
// Inspired from https://www.scs.stanford.edu/~dm/blog/c++-coroutines.html
template <typename T>
class Generator {
public: // types
using promise_type = coro::GeneratorPromise<T>;
using handle_type = n_coroutine::coroutine_handle<promise_type>;
using value_type = T;
using difference_type = std::ptrdiff_t;
private:
handle_type handle;
public:
explicit Generator(handle_type h) : handle(h) {}
Generator() {}
Generator(Generator const& other) : handle(other.handle) {
if (handle) {
handle.promise().addRef();
}
}
Generator(Generator&& other) : handle(std::move(other.handle)) { other.handle = handle_type{}; }
~Generator() {
if (handle) {
handle.promise().delRef();
}
}
Generator& operator=(Generator const& other) {
if (handle) {
handle.promise().delRef();
}
handle = other.handle;
if (handle) {
handle.promise().addRef();
}
return *this;
}
Generator& operator=(Generator&& other) {
if (handle) {
handle.promise().delRef();
}
handle = std::move(other.handle);
other.handle = handle_type{};
return *this;
}
explicit operator bool() { return handle && !handle.done(); }
static Generator<T> end() { return Generator<T>{}; }
bool operator==(Generator<T> const& other) const {
bool selfValid = handle && !handle.done();
bool otherValid = other.handle && !other.handle.done();
if (selfValid == otherValid) {
// if both generator are done, we consider them the same, otherwise they are only the same if both point to
// the same coroutine
return !selfValid || handle == other.handle;
}
return false;
}
bool operator!=(Generator<T> const& other) const { return !(*this == other); }
const T& operator*() const& {
auto& promise = handle.promise();
if (promise.error.isValid()) {
throw promise.error;
}
ASSERT(promise.value.has_value());
return promise.value.value();
}
Generator& operator++() {
handle.resume();
return *this;
}
void operator++(int) { ++(*this); }
};
template <typename ReturnValue, typename... Args>
struct [[maybe_unused]] n_coroutine::coroutine_traits<Future<ReturnValue>, Args...> {
using promise_type = coro::CoroPromise<ReturnValue, !coro::hasUncancellable<Args...>>;
};
template <typename ReturnValue, typename... Args>
struct [[maybe_unused]] n_coroutine::coroutine_traits<AsyncGenerator<ReturnValue>, Args...> {
static_assert(!coro::hasUncancellable<Args...>, "AsyncGenerator can't be uncancellable");
using promise_type = coro::AsyncGeneratorPromise<ReturnValue>;
};
#endif // FLOW_COROUTINES_H

View File

@ -0,0 +1,498 @@
/*
* CoroutinesImpl.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2023 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_COROUTINESIMPL_H
#define FLOW_COROUTINESIMPL_H
#include "flow/flow.h"
template <class T>
class Generator;
template <class T>
class AsyncGenerator;
struct Uncancellable;
namespace coro {
template <class F>
struct FutureReturnType;
template <class T>
struct FutureReturnType<Future<T>> {
using type = T;
};
template <class T>
struct FutureReturnType<FutureStream<T>> {
using type = T;
};
template <class T>
struct FutureReturnType<Future<T> const&> {
using type = T;
};
template <class T>
struct FutureReturnType<FutureStream<T> const&> {
using type = T;
};
template <class F>
using FutureReturnTypeT = typename FutureReturnType<F>::type;
enum class FutureType { FutureStream, Future };
template <class F>
struct GetFutureType;
template <class T>
struct GetFutureType<Future<T>> {
constexpr static FutureType value = FutureType::Future;
};
template <class T>
struct GetFutureType<FutureStream<T>> {
constexpr static FutureType value = FutureType::FutureStream;
};
template <class T>
struct GetFutureType<Future<T> const&> {
constexpr static FutureType value = FutureType::Future;
};
template <class T>
struct GetFutureType<FutureStream<T> const&> {
constexpr static FutureType value = FutureType::FutureStream;
};
template <class F>
inline constexpr FutureType GetFutureTypeV = GetFutureType<F>::value;
template <class T, bool IsCancellable>
struct CoroActor final : Actor<std::conditional_t<std::is_void_v<T>, Void, T>> {
using ValType = std::conditional_t<std::is_void_v<T>, Void, T>;
static void* operator new(size_t s) { return allocateFast(int(s)); }
static void operator delete(void* p, size_t s) { freeFast(int(s), p); }
n_coroutine::coroutine_handle<> handle;
int8_t& waitState() { return Actor<ValType>::actor_wait_state; }
template <class U>
void set(U&& value) {
new (&SAV<ValType>::value()) ValType(std::forward<U>(value));
SAV<ValType>::error_state = Error(SAV<ValType>::SET_ERROR_CODE);
}
void setError(Error const& e) { SAV<ValType>::error_state = e; }
void cancel() override {
if constexpr (IsCancellable) {
auto prev_wait_state = Actor<ValType>::actor_wait_state;
// Set wait state to -1
Actor<ValType>::actor_wait_state = -1;
// If the actor is waiting, then resume the coroutine to throw actor_cancelled().
if (prev_wait_state > 0) {
handle.resume();
}
}
}
void destroy() override { delete this; }
};
template <class U>
struct AwaitableFutureStore {
std::variant<Error, U> data;
constexpr bool isSet() const noexcept { return data.index() != 0 || std::get<0>(data).isValid(); }
void copy(U v) { data = std::move(v); }
void set(U&& v) { data = std::move(v); }
void set(U const& v) { data = v; }
const U& getRef() const {
switch (data.index()) {
case 0:
throw std::get<0>(data);
case 1:
return std::get<1>(data);
}
UNREACHABLE();
}
U&& get() && {
switch (data.index()) {
case 0:
throw std::get<0>(data);
case 1:
return std::get<1>(std::move(data));
}
UNREACHABLE();
}
};
template <class T>
using ToFutureVal = std::conditional_t<std::is_void_v<T>, Void, T>;
template <class F, class U, bool IsStream>
struct AwaitableResume;
template <class F>
struct AwaitableResume<F, Void, false> {
[[maybe_unused]] void await_resume() {
auto self = static_cast<F*>(this);
self->resumeImpl();
if (self->future.isError()) {
throw self->future.getError();
}
}
};
template <class F, class T>
struct AwaitableResume<F, T, false> {
T const& await_resume() {
auto self = static_cast<F*>(this);
if (self->resumeImpl()) {
if (self->future.isError()) {
throw self->future.getError();
}
return self->future.get();
}
return self->store.getRef();
}
};
template <class F, class T>
struct AwaitableResume<F, T, true> {
T await_resume() {
auto self = static_cast<F*>(this);
if (self->resumeImpl()) {
if (self->future.isError()) {
throw self->future.getError();
}
return self->future.pop();
}
return std::move(self->store).get();
}
};
template <class promise_type, class U, bool IsStream>
struct AwaitableFuture : std::conditional_t<IsStream, SingleCallback<ToFutureVal<U>>, Callback<ToFutureVal<U>>>,
AwaitableResume<AwaitableFuture<promise_type, U, IsStream>, U, IsStream> {
using FutureValue = ToFutureVal<U>;
using FutureType = std::conditional_t<IsStream, FutureStream<FutureValue>, Future<FutureValue> const&>;
FutureType future;
promise_type* pt = nullptr;
AwaitableFutureStore<FutureValue> store;
AwaitableFuture(const FutureType& f, promise_type* pt) : future(f), pt(pt) {}
void fire(FutureValue const& value) override {
store.set(value);
pt->resume();
}
void fire(FutureValue&& value) override {
store.set(std::move(value));
pt->resume();
}
void error(Error error) override {
store.data = error;
pt->resume();
}
[[maybe_unused]] [[nodiscard]] bool await_ready() const {
if (pt->waitState() < 0) {
pt->waitState() = -2;
// actor was cancelled
return true;
}
return future.isReady();
}
[[maybe_unused]] void await_suspend(n_coroutine::coroutine_handle<> h) {
// Create a coroutine callback if it's the first time being suspended
pt->setHandle(h);
// Set wait_state and add callback
pt->waitState() = 1;
if constexpr (IsStream) {
auto sf = future;
sf.addCallbackAndClear(this);
} else {
StrictFuture<FutureValue> sf = future;
sf.addCallbackAndClear(this);
}
}
bool resumeImpl() {
// If actor is cancelled, then throw actor_cancelled()
switch (pt->waitState()) {
case -1:
this->remove();
case -2:
// -2 means that the `await_suspend` call returned `true`, so we shouldn't remove the callback.
// if the wait_state is -1 we still have to throw, so we fall through to the -2 case
throw actor_cancelled();
}
bool wasReady = pt->waitState() == 0;
// Actor return from waiting, remove callback and reset wait_state.
if (pt->waitState() > 0) {
this->remove();
pt->waitState() = 0;
}
return wasReady;
}
};
template <class T, bool>
struct ActorMember {
T* member;
explicit ActorMember(n_coroutine::coroutine_handle<> handle) : member(new T(handle)) {}
T* ptr() { return member; }
T* operator->() { return member; }
const T* operator->() const { return member; }
};
template <class T>
struct ActorMember<T, true> {
T member;
explicit ActorMember(n_coroutine::coroutine_handle<> handle) : member(handle) {}
T* ptr() { return &member; }
T* operator->() { return &member; }
const T* operator->() const { return &member; }
};
template <class T, class Promise>
struct CoroReturn {
template <class U>
void return_value(U&& value) {
static_cast<Promise*>(this)->coroActor->set(std::forward<U>(value));
}
};
template <class Promise>
struct CoroReturn<Void, Promise> {
void return_void() { static_cast<Promise*>(this)->coroActor->set(Void()); }
};
template <class T, bool IsCancellable>
struct CoroPromise : CoroReturn<T, CoroPromise<T, IsCancellable>> {
using promise_type = CoroPromise<T, IsCancellable>;
using ActorType = coro::CoroActor<T, IsCancellable>;
using ReturnValue = std::conditional_t<std::is_void_v<T>, Void, T>;
using ReturnFutureType = Future<ReturnValue>;
ActorType* coroActor;
CoroPromise() : coroActor(new ActorType()) {}
n_coroutine::coroutine_handle<promise_type> handle() {
return n_coroutine::coroutine_handle<promise_type>::from_promise(*this);
}
static void* operator new(size_t s) { return allocateFast(int(s)); }
static void operator delete(void* p, size_t s) { freeFast(int(s), p); }
ReturnFutureType get_return_object() noexcept { return ReturnFutureType(coroActor); }
[[nodiscard]] n_coroutine::suspend_never initial_suspend() const noexcept { return {}; }
auto final_suspend() noexcept {
struct FinalAwaitable {
ActorType* sav;
// for debugging output only
explicit FinalAwaitable(ActorType* sav) : sav(sav) {}
[[nodiscard]] bool await_ready() const noexcept { return true; }
void await_resume() const noexcept {
if (sav->isError()) {
sav->finishSendErrorAndDelPromiseRef();
} else {
sav->finishSendAndDelPromiseRef();
}
}
constexpr void await_suspend(n_coroutine::coroutine_handle<>) const noexcept {}
};
return FinalAwaitable(coroActor);
}
void unhandled_exception() {
// The exception should always be type Error.
try {
std::rethrow_exception(std::current_exception());
} catch (const Error& error) {
// if (Actor<ReturnValue>::actor_wait_state == -1 && error.code() == error_code_operation_cancelled) {
// return;
// }
coroActor->setError(error);
// SAV<ReturnValue>::sendErrorAndDelPromiseRef(error);
} catch (...) {
coroActor->setError(unknown_error());
// SAV<ReturnValue>::sendErrorAndDelPromiseRef(unknown_error());
}
}
void setHandle(n_coroutine::coroutine_handle<> h) { coroActor->handle = h; }
void resume() { coroActor->handle.resume(); }
int8_t& waitState() { return coroActor->waitState(); }
template <class U>
auto await_transform(const Future<U>& future) {
return coro::AwaitableFuture<promise_type, U, false>{ future, this };
}
template <class U>
auto await_transform(const FutureStream<U>& futureStream) {
return coro::AwaitableFuture<promise_type, U, true>{ futureStream, this };
}
};
template <class T>
struct GeneratorPromise {
using handle_type = n_coroutine::coroutine_handle<GeneratorPromise<T>>;
static void* operator new(size_t s) { return allocateFast(int(s)); }
static void operator delete(void* p, size_t s) { freeFast(int(s), p); }
Error error;
std::optional<T> value;
mutable unsigned refCount = 1;
void addRef() const { refCount += 1; }
void delRef() const {
if (--refCount == 0) {
const_cast<GeneratorPromise<T>*>(this)->handle().destroy();
}
}
n_coroutine::suspend_never initial_suspend() { return {}; }
n_coroutine::suspend_always final_suspend() noexcept { return {}; }
auto handle() { return handle_type::from_promise(*this); }
Generator<T> get_return_object() { return Generator(handle_type::from_promise(*this)); }
void unhandled_exception() {
try {
std::rethrow_exception(std::current_exception());
} catch (Error& e) {
error = e;
} catch (...) {
error = unknown_error();
}
}
template <std::convertible_to<T> From> // C++20 concept
n_coroutine::suspend_always yield_value(From&& from) {
value = std::forward<From>(from);
return {};
}
void return_void() {}
};
template <class T>
struct AsyncGeneratorPromise {
using promise_type = AsyncGeneratorPromise<T>;
static void* operator new(size_t s) { return allocateFast(int(s)); }
static void operator delete(void* p, size_t s) { freeFast(int(s), p); }
n_coroutine::coroutine_handle<promise_type> handle() {
return n_coroutine::coroutine_handle<promise_type>::from_promise(*this);
}
[[nodiscard]] n_coroutine::suspend_always initial_suspend() const noexcept { return {}; }
[[nodiscard]] n_coroutine::suspend_always final_suspend() const noexcept { return {}; }
AsyncGenerator<T> get_return_object() { return AsyncGenerator<T>(&nextPromise, handle()); }
void return_void() { nextPromise.sendError(end_of_stream()); }
void unhandled_exception() {
// The exception should always be type Error.
try {
std::rethrow_exception(std::current_exception());
} catch (const Error& error) {
nextPromise.sendError(error);
} catch (...) {
nextPromise.sendError(unknown_error());
}
}
template <std::convertible_to<T> U>
n_coroutine::suspend_always yield_value(U&& value) {
nextPromise.send(std::forward<U>(value));
return {};
}
void setHandle(n_coroutine::coroutine_handle<> h) { mHandle = h; }
int8_t& waitState() { return mWaitState; }
void resume() { mHandle.resume(); }
template <class U>
auto await_transform(const Future<U>& future) {
return coro::AwaitableFuture<promise_type, U, false>{ future, this };
}
template <class U>
auto await_transform(const FutureStream<U>& futureStream) {
return coro::AwaitableFuture<promise_type, U, true>{ futureStream, this };
}
n_coroutine::coroutine_handle<> mHandle;
PromiseStream<T> nextPromise;
int8_t mWaitState = 0;
};
template <class... Args>
struct HasUncancellable;
template <>
struct HasUncancellable<> {
static constexpr bool value = false;
};
template <class First, class... Args>
struct HasUncancellable<First, Args...> {
static constexpr bool value = HasUncancellable<Args...>::value;
};
template <class... Rest>
struct HasUncancellable<Uncancellable, Rest...> {
static constexpr bool value = true;
};
template <class... Args>
inline constexpr bool hasUncancellable = HasUncancellable<Args...>::value;
} // namespace coro
#endif // FLOW_COROUTINESIMPL_H

View File

@ -874,6 +874,20 @@ public:
destroy();
}
// this is only used for C++ coroutines
void finishSendErrorAndDelPromiseRef() {
if (promises == 1 && !futures) {
// No one is left to receive the value, so we can just die
destroy();
return;
}
while (Callback<T>::next != this)
Callback<T>::next->error(this->error_state);
if (!--promises && !futures)
destroy();
}
void addPromiseRef() { promises++; }
void addFutureRef() { futures++; }
@ -1610,5 +1624,6 @@ void bindDeterministicRandomToOpenssl();
#pragma clang diagnostic pop
#endif
#include "flow/Coroutines.h"
#include "flow/genericactors.actor.h"
#endif

View File

@ -40,6 +40,7 @@
#include <utility>
#include "flow/flow.h"
#include "flow/CoroUtils.h"
#include "flow/Knobs.h"
#include "flow/Util.h"
#include "flow/IndexedSet.h"

View File

@ -115,6 +115,7 @@ struct NetworkInfo {
FlowLock* handshakeLock;
NetworkInfo();
~NetworkInfo();
};
class IEventFD : public ReferenceCounted<IEventFD> {

View File

@ -448,3 +448,6 @@ TEST_CASE("/flow/network/ipV6Preferred") {
}
NetworkInfo::NetworkInfo() : handshakeLock(new FlowLock(FLOW_KNOBS->TLS_HANDSHAKE_LIMIT)) {}
NetworkInfo::~NetworkInfo() {
delete handshakeLock;
}