Address review comments and fix serious bug

This commit is contained in:
Markus Pilman 2023-11-17 11:08:34 +01:00
parent f38ae571b0
commit fd4a300d4c
4 changed files with 29 additions and 28 deletions

View File

@ -1,6 +1,7 @@
# Coroutines in Flow # Coroutines in Flow
* [Introduction](#Introduction) * [Introduction](#Introduction)
* [Coroutines vs ACTORs](#coroutines-vs-actors)
* [Basic Types](#basic-types) * [Basic Types](#basic-types)
* [Choose-When](#choose-when) * [Choose-When](#choose-when)
* [Execution in when-expressions](#execution-in-when-expressions) * [Execution in when-expressions](#execution-in-when-expressions)
@ -38,6 +39,13 @@ Future<double> simpleCoroutine() {
This document assumes some familiarity with Flow. As of today, actors and coroutines This document assumes some familiarity with Flow. As of today, actors and coroutines
can be freely mixed, but new code should be written using coroutines. can be freely mixed, but new code should be written using coroutines.
## Coroutines vs ACTORs
It is important to understand that C++ coroutine support doesn't change anything in Flow: they are not a replacement
of Flow but they replace the actor compiler with a C++ compiler. This means, that the network loop, all Flow types,
the RPC layer, and the simulator all remain unchanged. A coroutine simply returns a special `SAV<T>` which has handle
to a coroutine.
## Basic Types ## Basic Types
As defined in the C++20 standard, a function is a coroutine if its body contains at least one `co_await`, `co_yield`, As defined in the C++20 standard, a function is a coroutine if its body contains at least one `co_await`, `co_yield`,
@ -624,8 +632,8 @@ struct Foo : IFoo {
}; };
``` ```
This boilerplate is necessary, because `ACTOR`s can't be class members since the actual code is moved into a different This boilerplate is necessary, because `ACTOR`s can't be class members: the actor compiler will generate another
struct which. `struct` and move the code there -- so `this` will point to the actor state and not to the class instance.
With C++ coroutines, this limitation goes away. So a cleaner (and slightly more efficient) implementation of the above With C++ coroutines, this limitation goes away. So a cleaner (and slightly more efficient) implementation of the above
is: is:
@ -685,4 +693,8 @@ Future<Void> someActor() {
If the struct `SomeStruct` would initialize its primitive members explicitly (for example by using `int a = 0;` and If the struct `SomeStruct` would initialize its primitive members explicitly (for example by using `int a = 0;` and
`bool b = false`) this would be a non-issue. And explicit initialization is probably the right fix here. Sadly, it `bool b = false`) this would be a non-issue. And explicit initialization is probably the right fix here. Sadly, it
doesn't seem like UBSAN finds these kind of subtle bugs. doesn't seem like UBSAN finds these kind of subtle bugs.
Another difference is, that if a `state` variables might be initialized twice: once at the creation of the actor using
the default constructor and a second time at the point where the variable is initialized in the code. With C++
coroutines we now get the expected behavior, which is better, but nonetheless a potential behavior change.

View File

@ -31,6 +31,9 @@
#include <memory> #include <memory>
#include <iostream> #include <iostream>
using namespace std::literals::string_literals;
using namespace std::literals::string_view_literals;
NetworkAddress serverAddress; NetworkAddress serverAddress;
enum TutorialWellKnownEndpoints { enum TutorialWellKnownEndpoints {
@ -44,12 +47,6 @@ enum TutorialWellKnownEndpoints {
Future<Void> simpleTimer() { Future<Void> simpleTimer() {
// we need to remember the time when we first // we need to remember the time when we first
// started. // started.
// This needs to be a state-variable because
// we will use it in different parts of the
// actor. If you don't understand how state
// variables work, it is a good idea to remove
// the state keyword here and look at the
// generated C++ code from the actor compiler.
double start_time = g_network->now(); double start_time = g_network->now();
loop { loop {
co_await delay(1.0); co_await delay(1.0);
@ -432,12 +429,8 @@ Future<Void> fdbClientStream() {
Key next; Key next;
int64_t bytes = 0; int64_t bytes = 0;
Future<Void> logFuture = logThroughput(&bytes, &next); Future<Void> logFuture = logThroughput(&bytes, &next);
Future<Void> onError;
loop { loop {
if (onError.isValid()) { Future<Void> onError;
co_await onError;
onError = Future<Void>();
}
PromiseStream<Standalone<RangeResultRef>> results; PromiseStream<Standalone<RangeResultRef>> results;
try { try {
Future<Void> stream = tx.getRangeStream(results, Future<Void> stream = tx.getRangeStream(results,
@ -457,6 +450,7 @@ Future<Void> fdbClientStream() {
} }
onError = tx.onError(e); onError = tx.onError(e);
} }
co_await onError;
} }
} }
@ -470,13 +464,9 @@ bool transaction_done(void) {
template <class DB, class Fun> template <class DB, class Fun>
Future<Void> runTransactionWhile(DB const& db, Fun f) { Future<Void> runTransactionWhile(DB const& db, Fun f) {
Future<Void> onError;
Transaction tr(db); Transaction tr(db);
loop { loop {
if (onError.isValid()) { Future<Void> onError;
co_await onError;
onError = Future<Void>();
}
try { try {
if (transactionDone(co_await f(&tr))) { if (transactionDone(co_await f(&tr))) {
co_return; co_return;
@ -484,6 +474,7 @@ Future<Void> runTransactionWhile(DB const& db, Fun f) {
} catch (Error& e) { } catch (Error& e) {
onError = tr.onError(e); onError = tr.onError(e);
} }
co_await onError;
} }
} }
@ -603,6 +594,7 @@ AsyncGenerator<Optional<StringRef>> readLines(Reference<IAsyncFile> file) {
co_yield lastLine; co_yield lastLine;
lastLine = {}; lastLine = {};
arena = Arena(); arena = Arena();
co_return;
} }
} }
StringRef block = optionalBlock.get(); StringRef block = optionalBlock.get();
@ -625,13 +617,10 @@ AsyncGenerator<Optional<StringRef>> readLines(Reference<IAsyncFile> file) {
} }
} }
} }
if (!lastLine.empty()) {
co_yield lastLine;
}
} }
Future<Void> testReadLines() { Future<Void> testReadLines() {
auto path = "/etc/hosts"s;
auto file = co_await IAsyncFileSystem::filesystem()->open( auto file = co_await IAsyncFileSystem::filesystem()->open(
"/Users/mpilman/Projects/frostdb/flow/include/flow/flow.h", IAsyncFile::OPEN_READWRITE, 0640); "/Users/mpilman/Projects/frostdb/flow/include/flow/flow.h", IAsyncFile::OPEN_READWRITE, 0640);
auto lines = readLines(file); auto lines = readLines(file);

View File

@ -1351,6 +1351,8 @@ Future<Void> actor_throw_test(std::stringstream& ss) {
LifetimeLogger ll(ss, 0); LifetimeLogger ll(ss, 0);
co_await delay(0);
throw io_error(); throw io_error();
ss << "after throw. "; ss << "after throw. ";

View File

@ -163,12 +163,10 @@ template <class F>
struct AwaitableResume<F, Void, false> { struct AwaitableResume<F, Void, false> {
[[maybe_unused]] void await_resume() { [[maybe_unused]] void await_resume() {
auto self = static_cast<F*>(this); auto self = static_cast<F*>(this);
if (self->resumeImpl()) { self->resumeImpl();
if (self->future.isError()) { if (self->future.isError()) {
throw self->future.getError(); throw self->future.getError();
}
} }
return;
} }
}; };