From a2bb5208470b339cb13ff209cca7fabea9b5bf48 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Fri, 8 Dec 2023 17:46:21 +0100 Subject: [PATCH] Fix cancellation bug in Choose class --- fdbrpc/CoroTests.cpp | 44 +++++++++++++++++++++++++++++++++++ flow/include/flow/CoroUtils.h | 7 ++++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/fdbrpc/CoroTests.cpp b/fdbrpc/CoroTests.cpp index 413269e124..b95027028c 100644 --- a/fdbrpc/CoroTests.cpp +++ b/fdbrpc/CoroTests.cpp @@ -1914,3 +1914,47 @@ TEST_CASE("/flow/coro/actor") { co_await futureStreamTest(); co_await stackMemoryTest(); } + +TEST_CASE("/flow/coro/chooseCancelWaiting") { + Promise voidPromise; + Promise intPromise; + Future chooseFuture = Choose() + .When(voidPromise.getFuture(), [](const Void&) { ASSERT_ABORT(false); }) + .When(intPromise.getFuture(), [](const int&) { ASSERT_ABORT(false); }) + .run(); + chooseFuture.cancel(); + ASSERT(chooseFuture.getError().code() == error_code_actor_cancelled); + voidPromise.send(Void()); + intPromise.sendError(end_of_stream()); + ASSERT(chooseFuture.getError().code() == error_code_actor_cancelled); + return Void(); +} + +TEST_CASE("/flow/coro/chooseCancelReady") { + Promise intPromise; + int res = 0; + Future chooseFuture = Choose().When(intPromise.getFuture(), [&](const int& val) { res = val; }).run(); + intPromise.send(5); + ASSERT(chooseFuture.isReady()); + ASSERT(res == 5); + chooseFuture.cancel(); + ASSERT(chooseFuture.isReady()); + return Void(); +} + +TEST_CASE("/flow/coro/chooseRepeatedCancel") { + Promise voidPromise; + Promise intPromise; + Future chooseFuture = Choose() + .When(voidPromise.getFuture(), [](const Void&) { ASSERT_ABORT(false); }) + .When(intPromise.getFuture(), [](const int&) { ASSERT_ABORT(false); }) + .run(); + chooseFuture.cancel(); + ASSERT(chooseFuture.getError().code() == error_code_actor_cancelled); + chooseFuture.cancel(); + ASSERT(chooseFuture.getError().code() == error_code_actor_cancelled); + voidPromise.sendError(end_of_stream()); + intPromise.send(3); + ASSERT(chooseFuture.getError().code() == error_code_actor_cancelled); + return Void(); +} \ No newline at end of file diff --git a/flow/include/flow/CoroUtils.h b/flow/include/flow/CoroUtils.h index dd7ee4ac9c..65547f19d7 100644 --- a/flow/include/flow/CoroUtils.h +++ b/flow/include/flow/CoroUtils.h @@ -58,6 +58,7 @@ struct ChooseImplCallback } void a_callback_fire(ThisCallback*, ValueType const& value) { + getParent()->actor_wait_state = 0; getParent()->removeCallbacks(); try { std::get(getParent()->functions)(value); @@ -70,6 +71,7 @@ struct ChooseImplCallback } void a_callback_error(ThisCallback*, Error e) { + getParent()->actor_wait_state = 0; getParent()->removeCallbacks(); getParent()->SAV::sendErrorAndDelPromiseRef(e); } @@ -103,12 +105,13 @@ struct ChooseImplActor final : Actor, std::tuple const&)>...>&& functions) : Actor(), futures(futures), functions(functions) { ChooseImplCallback, 0, Args...>::registerCallbacks(); + actor_wait_state = 1; } void cancel() override { - auto waitState = actor_wait_state; + const auto waitState = actor_wait_state; actor_wait_state = -1; - if (waitState) { + if (waitState > 0) { ChooseImplCallback, 0, Args...>::removeCallbacks(); SAV::sendErrorAndDelPromiseRef(actor_cancelled()); }