docs: full docs and doctests for Action/MultiAction

This commit is contained in:
Greg Johnston 2024-05-12 21:05:09 -04:00
parent 26cf4848db
commit fd048295a4
2 changed files with 831 additions and 49 deletions

View File

@ -1,5 +1,5 @@
use crate::{
computed::ArcMemo,
computed::{ArcMemo, Memo},
diagnostics::is_suppressing_resource_load,
owner::{Owner, StoredValue},
signal::{ArcRwSignal, RwSignal},
@ -10,7 +10,7 @@ use any_spawner::Executor;
use futures::{channel::oneshot, select, FutureExt};
use std::{future::Future, panic::Location, pin::Pin, sync::Arc};
/// An action run some asynchronous code when you dispatch a new value to it, and gives you
/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
/// reactive access to the result.
///
/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
@ -86,6 +86,7 @@ use std::{future::Future, panic::Location, pin::Pin, sync::Arc};
///
/// // if there are multiple arguments, use a tuple
/// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() });
/// ```
pub struct ArcAction<I, O>
where
I: 'static,
@ -126,7 +127,7 @@ where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
/// Creates a new action. Thi is lazy: it does not run the action function until some value
/// Creates a new action. This is lazy: it does not run the action function until some value
/// is dispatched.
///
/// The constructor takes a function which will create a new `Future` from some input data.
@ -185,71 +186,168 @@ where
#[track_caller]
pub fn dispatch(&self, input: I) {
//if !is_suppressing_resource_load() {
let mut fut = (self.action_fn)(&input).fuse();
if !is_suppressing_resource_load() {
let mut fut = (self.action_fn)(&input).fuse();
// abort this task if the owner is cleaned up
let (abort_tx, mut abort_rx) = oneshot::channel();
Owner::on_cleanup(move || {
abort_tx.send(()).expect(
"tried to cancel a future in ArcAction::dispatch(), but the \
channel has already closed",
);
});
// abort this task if the owner is cleaned up
let (abort_tx, mut abort_rx) = oneshot::channel();
Owner::on_cleanup(move || {
abort_tx.send(()).expect(
"tried to cancel a future in ArcAction::dispatch(), but \
the channel has already closed",
);
});
// Update the state before loading
self.in_flight.update(|n| *n += 1);
let current_version =
self.version.try_get_untracked().unwrap_or_default();
self.input.try_update(|inp| *inp = Some(input));
// Update the state before loading
self.in_flight.update(|n| *n += 1);
let current_version =
self.version.try_get_untracked().unwrap_or_default();
self.input.try_update(|inp| *inp = Some(input));
// Spawn the task
Executor::spawn({
let input = self.input.clone();
let version = self.version.clone();
let value = self.value.clone();
let in_flight = self.in_flight.clone();
async move {
select! {
// if the abort message has been sent, bail and do nothing
_ = abort_rx => {
in_flight.update(|n| *n = n.saturating_sub(1));
},
// otherwise, update the value
result = fut => {
in_flight.update(|n| *n = n.saturating_sub(1));
let is_latest = version.get_untracked() <= current_version;
if is_latest {
version.update(|n| *n += 1);
value.update(|n| *n = Some(result));
}
if in_flight.get_untracked() == 0 {
input.update(|inp| *inp = None);
// Spawn the task
Executor::spawn({
let input = self.input.clone();
let version = self.version.clone();
let value = self.value.clone();
let in_flight = self.in_flight.clone();
async move {
select! {
// if the abort message has been sent, bail and do nothing
_ = abort_rx => {
in_flight.update(|n| *n = n.saturating_sub(1));
},
// otherwise, update the value
result = fut => {
in_flight.update(|n| *n = n.saturating_sub(1));
let is_latest = version.get_untracked() <= current_version;
if is_latest {
version.update(|n| *n += 1);
value.update(|n| *n = Some(result));
}
if in_flight.get_untracked() == 0 {
input.update(|inp| *inp = None);
}
}
}
}
}
});
//}
});
}
}
}
impl<I, O> ArcAction<I, O> {
/// The number of times the action has successfully completed.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = ArcAction::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let version = act.version();
/// act.dispatch(3);
/// assert_eq!(version.get(), 0);
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(version.get(), 1);
/// # });
/// ```
#[track_caller]
pub fn version(&self) -> ArcRwSignal<usize> {
self.version.clone()
}
/// The current argument that was dispatched to the async function. This value will
/// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = ArcAction::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let input = act.input();
/// assert_eq!(input.get(), None);
/// act.dispatch(3);
/// assert_eq!(input.get(), Some(3));
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(input.get(), None);
/// # });
/// ```
#[track_caller]
pub fn input(&self) -> ArcRwSignal<Option<I>> {
self.input.clone()
}
/// The most recent return value of the `async` function. This will be `None` before
/// the action has ever run successfully, and subsequently will always be `Some(_)`,
/// holding the old value until a new value has been received.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = ArcAction::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let value = act.value();
/// assert_eq!(value.get(), None);
/// act.dispatch(3);
/// assert_eq!(value.get(), None);
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(value.get(), Some(6));
/// // dispatch another value, and it still holds the old value
/// act.dispatch(3);
/// assert_eq!(value.get(), Some(6));
/// # });
/// ```
#[track_caller]
pub fn value(&self) -> ArcRwSignal<Option<O>> {
self.value.clone()
}
/// Whether the action has been dispatched and is currently waiting to resolve.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = ArcAction::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let pending = act.pending();
/// assert_eq!(pending.get(), false);
/// act.dispatch(3);
/// assert_eq!(pending.get(), true);
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(pending.get(), false);
/// # });
/// ```
#[track_caller]
pub fn pending(&self) -> ArcMemo<bool> {
let in_flight = self.in_flight.clone();
@ -274,6 +372,83 @@ where
}
}
/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
/// reactive access to the result.
///
/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
/// creating an action and immediately dispatching a value to it, this is probably the wrong
/// primitive.
///
/// The reference-counted, `Clone` (but not `Copy` version of an `Action` is an [`ArcAction`].
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let save_data = Action::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// // the argument currently running
/// let input = save_data.input();
/// // the most recent returned result
/// let result_of_call = save_data.value();
/// // whether the call is pending
/// let pending = save_data.pending();
/// // how many times the action has run
/// // useful for reactively updating something else in response to a `dispatch` and response
/// let version = save_data.version();
///
/// // before we do anything
/// assert_eq!(input.get(), None); // no argument yet
/// assert_eq!(pending.get(), false); // isn't pending a response
/// assert_eq!(result_of_call.get(), None); // there's no "last value"
/// assert_eq!(version.get(), 0);
///
/// // dispatch the action
/// save_data.dispatch("My todo".to_string());
///
/// // when we're making the call
/// assert_eq!(input.get(), Some("My todo".to_string()));
/// assert_eq!(pending.get(), true); // is pending
/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
///
/// # tokio::time::sleep(std::time::Duration::from_millis(25)).await;
///
/// // after call has resolved
/// assert_eq!(input.get(), None); // input clears out after resolved
/// assert_eq!(pending.get(), false); // no longer pending
/// assert_eq!(result_of_call.get(), Some(42));
/// assert_eq!(version.get(), 1);
/// # });
/// ```
///
/// The input to the `async` function should always be a single value,
/// but it can be of any type. The argument is always passed by reference to the
/// function, because it is stored in [Action::input] as well.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// // if there's a single argument, just use that
/// let action1 = Action::new(|input: &String| {
/// let input = input.clone();
/// async move { todo!() }
/// });
///
/// // if there are no arguments, use the unit type `()`
/// let action2 = Action::new(|input: &()| async { todo!() });
///
/// // if there are multiple arguments, use a tuple
/// let action3 = Action::new(|input: &(usize, String)| async { todo!() });
/// ```
pub struct Action<I, O>
where
I: 'static,
@ -295,6 +470,47 @@ where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
/// Creates a new action. This is lazy: it does not run the action function until some value
/// is dispatched.
///
/// The constructor takes a function which will create a new `Future` from some input data.
/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
/// be spawned.
///
/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
/// `Future` must be `Send` so that it can be moved across threads by the async executor as
/// needed. In order to be stored in the `Copy` arena, the input and output types should also
/// be `Send + Sync`.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = Action::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// act.dispatch(3);
/// assert_eq!(act.input().get(), Some(3));
///
/// // Remember that async functions already return a future if they are
/// // not `await`ed. You can save keystrokes by leaving out the `async move`
///
/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
/// act2.dispatch(String::from("i'm in a doctest"));
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
///
/// // after it resolves
/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
///
/// async fn yell(n: String) -> String {
/// n.to_uppercase()
/// }
/// # });
/// ```
#[track_caller]
pub fn new<F, Fu>(action_fn: F) -> Self
where
@ -308,6 +524,28 @@ where
}
}
/// The number of times the action has successfully completed.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = Action::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let version = act.version();
/// act.dispatch(3);
/// assert_eq!(version.get(), 0);
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(version.get(), 1);
/// # });
/// ```
#[track_caller]
pub fn version(&self) -> RwSignal<usize> {
let inner = self
@ -317,6 +555,30 @@ where
inner.into()
}
/// The current argument that was dispatched to the async function. This value will
/// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = ArcAction::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let input = act.input();
/// assert_eq!(input.get(), None);
/// act.dispatch(3);
/// assert_eq!(input.get(), Some(3));
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(input.get(), None);
/// # });
/// ```
#[track_caller]
pub fn input(&self) -> RwSignal<Option<I>> {
let inner = self
@ -326,6 +588,34 @@ where
inner.into()
}
/// The most recent return value of the `async` function. This will be `None` before
/// the action has ever run successfully, and subsequently will always be `Some(_)`,
/// holding the old value until a new value has been received.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = Action::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let value = act.value();
/// assert_eq!(value.get(), None);
/// act.dispatch(3);
/// assert_eq!(value.get(), None);
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(value.get(), Some(6));
/// // dispatch another value, and it still holds the old value
/// act.dispatch(3);
/// assert_eq!(value.get(), Some(6));
/// # });
/// ```
#[track_caller]
pub fn value(&self) -> RwSignal<Option<O>> {
let inner = self
@ -339,6 +629,38 @@ where
pub fn dispatch(&self, input: I) {
self.inner.with_value(|inner| inner.dispatch(input));
}
/// Whether the action has been dispatched and is currently waiting to resolve.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = Action::new(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// let pending = act.pending();
/// assert_eq!(pending.get(), false);
/// act.dispatch(3);
/// assert_eq!(pending.get(), true);
///
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// // after it resolves
/// assert_eq!(pending.get(), false);
/// # });
/// ```
#[track_caller]
pub fn pending(&self) -> Memo<bool> {
let inner = self
.inner
.try_with_value(|inner| inner.pending())
.unwrap_or_else(unwrap_signal!(self));
inner.into()
}
}
impl<I, O> DefinedAt for Action<I, O>
@ -375,10 +697,51 @@ where
{
}
/// Creates a new action. This is lazy: it does not run the action function until some value
/// is dispatched.
///
/// The constructor takes a function which will create a new `Future` from some input data.
/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
/// be spawned.
///
/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
/// `Future` must be `Send` so that it can be moved across threads by the async executor as
/// needed. In order to be stored in the `Copy` arena, the input and output types should also
/// be `Send + Sync`.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// let act = create_action(|n: &u8| {
/// let n = n.to_owned();
/// async move { n * 2 }
/// });
///
/// act.dispatch(3);
/// assert_eq!(act.input().get(), Some(3));
///
/// // Remember that async functions already return a future if they are
/// // not `await`ed. You can save keystrokes by leaving out the `async move`
///
/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
/// act2.dispatch(String::from("i'm in a doctest"));
/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
///
/// // after it resolves
/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
///
/// async fn yell(n: String) -> String {
/// n.to_uppercase()
/// }
/// # });
/// ```
#[inline(always)]
#[track_caller]
#[deprecated = "This function is being removed to conform to Rust \
idioms.Please use `Action::new()` instead."]
#[deprecated = "This function is being removed to conform to Rust idioms. \
Please use `Action::new()` instead."]
pub fn create_action<I, O, F, Fu>(action_fn: F) -> Action<I, O>
where
I: Send + Sync + 'static,

View File

@ -8,6 +8,43 @@ use crate::{
use any_spawner::Executor;
use std::{fmt::Debug, future::Future, panic::Location, pin::Pin, sync::Arc};
/// An action that synchronizes multiple imperative `async` calls to the reactive system,
/// tracking the progress of each one.
///
/// Where an [`Action`](super::Action) fires a single call, a `MultiAction` allows you to
/// keep track of multiple in-flight actions.
///
/// If youre trying to load data by running an `async` function reactively, you probably
/// want to use an [`AsyncDerived`](crate::computed::AsyncDerived) instead.
/// If youre trying to occasionally run an `async` function in response to something
/// like a user adding a task to a todo list, youre in the right place.
///
/// The reference-counted, `Clone` (but not `Copy` version of a `MultiAction` is an [`ArcMultiAction`].
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = MultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// add_todo.dispatch("Buy milk".to_string());
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// let submissions = add_todo.submissions();
/// assert_eq!(submissions.with(Vec::len), 3);
/// # });
/// ```
pub struct MultiAction<I, O>
where
I: 'static,
@ -63,6 +100,32 @@ where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
/// Creates a new multi-action.
///
/// The input to the `async` function should always be a single value,
/// but it can be of any type. The argument is always passed by reference to the
/// function, because it is stored in [Submission::input] as well.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// // if there's a single argument, just use that
/// let action1 = MultiAction::new(|input: &String| {
/// let input = input.clone();
/// async move { todo!() }
/// });
///
/// // if there are no arguments, use the unit type `()`
/// let action2 = MultiAction::new(|input: &()| async { todo!() });
///
/// // if there are multiple arguments, use a tuple
/// let action3 =
/// MultiAction::new(|input: &(usize, String)| async { todo!() });
/// # });
/// ```
#[track_caller]
pub fn new<Fut>(
action_fn: impl Fn(&I) -> Fut + Send + Sync + 'static,
@ -78,6 +141,48 @@ where
}
/// Calls the `async` function with a reference to the input type as its argument.
///
/// This can be called any number of times: each submission will be dispatched, running
/// concurrently, and its status can be checked via the
/// [`submissions()`](MultiAction::submissions) signal.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = MultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let submissions = add_todo.submissions();
/// let pending_submissions = move || {
/// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count())
/// };
///
/// add_todo.dispatch("Buy milk".to_string());
/// assert_eq!(submissions.with(Vec::len), 1);
/// assert_eq!(pending_submissions(), 1);
///
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// assert_eq!(submissions.with(Vec::len), 3);
/// assert_eq!(pending_submissions(), 3);
///
/// // when submissions resolve, they are not removed from the set
/// // however, their `pending` signal is now `false`, and this can be used to filter them
/// # tokio::time::sleep(std::time::Duration::from_millis(100)).await;
/// assert_eq!(submissions.with(Vec::len), 3);
/// assert_eq!(pending_submissions(), 0);
/// # });
/// ```
pub fn dispatch(&self, input: I) {
if !is_suppressing_resource_load() {
self.inner.with_value(|inner| inner.dispatch(input));
@ -86,13 +191,72 @@ where
/// Synchronously adds a submission with the given value.
///
/// This takes the output value, rather than the input, because it is adding a result, not an
/// input.
///
/// This can be useful for use cases like handling errors, where the error can already be known
/// on the client side.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = MultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let submissions = add_todo.submissions();
/// let pending_submissions = move || {
/// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count())
/// };
///
/// add_todo.dispatch("Buy milk".to_string());
/// assert_eq!(submissions.with(Vec::len), 1);
/// assert_eq!(pending_submissions(), 1);
///
/// add_todo.dispatch_sync(42);
///
/// assert_eq!(submissions.with(Vec::len), 2);
/// assert_eq!(pending_submissions(), 1);
/// # });
/// ```
pub fn dispatch_sync(&self, value: O) {
self.inner.with_value(|inner| inner.dispatch_sync(value));
}
/// The set of all submissions to this multi-action.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = MultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let submissions = add_todo.submissions();
///
/// add_todo.dispatch("Buy milk".to_string());
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// assert_eq!(submissions.with(Vec::len), 3);
/// # });
/// ```
pub fn submissions(&self) -> ReadSignal<Vec<ArcSubmission<I, O>>> {
self.inner
.try_with_value(|inner| inner.submissions())
@ -101,6 +265,35 @@ where
}
/// How many times an action has successfully resolved.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = MultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let version = add_todo.version();
///
/// add_todo.dispatch("Buy milk".to_string());
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// assert_eq!(version.get(), 0);
/// # tokio::time::sleep(std::time::Duration::from_millis(100)).await;
///
/// // when they've all resolved
/// assert_eq!(version.get(), 3);
/// # });
/// ```
pub fn version(&self) -> RwSignal<usize> {
self.inner
.try_with_value(|inner| inner.version())
@ -109,6 +302,43 @@ where
}
}
/// An action that synchronizes multiple imperative `async` calls to the reactive system,
/// tracking the progress of each one.
///
/// Where an [`Action`](super::Action) fires a single call, a `MultiAction` allows you to
/// keep track of multiple in-flight actions.
///
/// If youre trying to load data by running an `async` function reactively, you probably
/// want to use an [`AsyncDerived`](crate::computed::AsyncDerived) instead.
/// If youre trying to occasionally run an `async` function in response to something
/// like a user adding a task to a todo list, youre in the right place.
///
/// The arena-allocated, `Copy` version of an `ArcMultiAction` is a [`MultiAction`].
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = ArcMultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// add_todo.dispatch("Buy milk".to_string());
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// let submissions = add_todo.submissions();
/// assert_eq!(submissions.with(Vec::len), 3);
/// # });
/// ```
pub struct ArcMultiAction<I, O>
where
I: 'static,
@ -151,9 +381,36 @@ where
impl<I, O> ArcMultiAction<I, O>
where
I: 'static,
O: 'static,
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
/// Creates a new multi-action.
///
/// The input to the `async` function should always be a single value,
/// but it can be of any type. The argument is always passed by reference to the
/// function, because it is stored in [Submission::input] as well.
///
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// // if there's a single argument, just use that
/// let action1 = ArcMultiAction::new(|input: &String| {
/// let input = input.clone();
/// async move { todo!() }
/// });
///
/// // if there are no arguments, use the unit type `()`
/// let action2 = ArcMultiAction::new(|input: &()| async { todo!() });
///
/// // if there are multiple arguments, use a tuple
/// let action3 =
/// ArcMultiAction::new(|input: &(usize, String)| async { todo!() });
/// # });
/// ```
#[track_caller]
pub fn new<Fut>(
action_fn: impl Fn(&I) -> Fut + Send + Sync + 'static,
) -> Self
@ -172,6 +429,51 @@ where
}
/// Calls the `async` function with a reference to the input type as its argument.
///
/// This can be called any number of times: each submission will be dispatched, running
/// concurrently, and its status can be checked via the
/// [`submissions()`](MultiAction::submissions) signal.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = ArcMultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let submissions = add_todo.submissions();
/// let pending_submissions = {
/// let submissions = submissions.clone();
/// move || {
/// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count())
/// }
/// };
///
/// add_todo.dispatch("Buy milk".to_string());
/// assert_eq!(submissions.with(Vec::len), 1);
/// assert_eq!(pending_submissions(), 1);
///
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// assert_eq!(submissions.with(Vec::len), 3);
/// assert_eq!(pending_submissions(), 3);
///
/// // when submissions resolve, they are not removed from the set
/// // however, their `pending` signal is now `false`, and this can be used to filter them
/// # tokio::time::sleep(std::time::Duration::from_millis(100)).await;
/// assert_eq!(submissions.with(Vec::len), 3);
/// assert_eq!(pending_submissions(), 0);
/// # });
/// ```
pub fn dispatch(&self, input: I) {
if !is_suppressing_resource_load() {
let fut = (self.action_fn)(&input);
@ -188,7 +490,7 @@ where
let version = self.version.clone();
Executor::spawn_local(async move {
Executor::spawn(async move {
let new_value = fut.await;
let canceled = submission.canceled.get_untracked();
if !canceled {
@ -203,13 +505,50 @@ where
/// Synchronously adds a submission with the given value.
///
/// This takes the output value, rather than the input, because it is adding a result, not an
/// input.
///
/// This can be useful for use cases like handling errors, where the error can already be known
/// on the client side.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = ArcMultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let submissions = add_todo.submissions();
/// let pending_submissions = {
/// let submissions = submissions.clone();
/// move || {
/// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count())
/// }
/// };
///
/// add_todo.dispatch("Buy milk".to_string());
/// assert_eq!(submissions.with(Vec::len), 1);
/// assert_eq!(pending_submissions(), 1);
///
/// add_todo.dispatch_sync(42);
///
/// assert_eq!(submissions.with(Vec::len), 2);
/// assert_eq!(pending_submissions(), 1);
/// # });
/// ```
pub fn dispatch_sync(&self, value: O) {
let submission = ArcSubmission {
input: ArcRwSignal::new(None),
value: ArcRwSignal::new(Some(value)),
pending: ArcRwSignal::new(true),
pending: ArcRwSignal::new(false),
canceled: ArcRwSignal::new(false),
};
@ -219,11 +558,65 @@ where
}
/// The set of all submissions to this multi-action.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = ArcMultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let submissions = add_todo.submissions();
///
/// add_todo.dispatch("Buy milk".to_string());
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// assert_eq!(submissions.with(Vec::len), 3);
/// # });
/// ```
pub fn submissions(&self) -> ArcReadSignal<Vec<ArcSubmission<I, O>>> {
self.submissions.read_only()
}
/// How many times an action has successfully resolved.
/// ```rust
/// # use reactive_graph::actions::*;
/// # use reactive_graph::prelude::*;
/// # tokio_test::block_on(async move {
/// # any_spawner::Executor::init_tokio();
/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
/// async fn send_new_todo_to_api(task: String) -> usize {
/// // do something...
/// // return a task id
/// 42
/// }
/// let add_todo = ArcMultiAction::new(|task: &String| {
/// // `task` is given as `&String` because its value is available in `input`
/// send_new_todo_to_api(task.clone())
/// });
///
/// let version = add_todo.version();
///
/// add_todo.dispatch("Buy milk".to_string());
/// add_todo.dispatch("???".to_string());
/// add_todo.dispatch("Profit!!!".to_string());
///
/// assert_eq!(version.get(), 0);
/// # tokio::time::sleep(std::time::Duration::from_millis(100)).await;
///
/// // when they've all resolved
/// assert_eq!(version.get(), 3);
/// # });
/// ```
pub fn version(&self) -> ArcRwSignal<usize> {
self.version.clone()
}
@ -251,23 +644,37 @@ where
I: 'static,
O: 'static,
{
/// The current argument that was dispatched to the `async` function.
/// `Some` while we are waiting for it to resolve, `None` if it has resolved.
#[track_caller]
pub fn input(&self) -> ArcReadSignal<Option<I>> {
self.input.read_only()
}
/// The most recent return value of the `async` function.
#[track_caller]
pub fn value(&self) -> ArcReadSignal<Option<O>> {
self.value.read_only()
}
/// Whether this submision is still waiting to resolve.
#[track_caller]
pub fn pending(&self) -> ArcReadSignal<bool> {
self.pending.read_only()
}
/// Whether this submission has been canceled.
#[track_caller]
pub fn canceled(&self) -> ArcReadSignal<bool> {
self.canceled.read_only()
}
/// Cancels the submission. This will not necessarily prevent the `Future`
/// from continuing to run, but it will update the returned value.
#[track_caller]
pub fn cancel(&self) {
// TODO if we set these up to race against a cancel signal, we could actually drop the
// futures
self.canceled.try_set(true);
}
}
@ -325,22 +732,34 @@ where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
/// The current argument that was dispatched to the `async` function.
/// `Some` while we are waiting for it to resolve, `None` if it has resolved.
#[track_caller]
pub fn input(&self) -> ReadSignal<Option<I>> {
self.input.read_only()
}
/// The most recent return value of the `async` function.
#[track_caller]
pub fn value(&self) -> ReadSignal<Option<O>> {
self.value.read_only()
}
/// Whether this submision is still waiting to resolve.
#[track_caller]
pub fn pending(&self) -> ReadSignal<bool> {
self.pending.read_only()
}
/// Whether this submission has been canceled.
#[track_caller]
pub fn canceled(&self) -> ReadSignal<bool> {
self.canceled.read_only()
}
/// Cancels the submission. This will not necessarily prevent the `Future`
/// from continuing to run, but it will update the returned value.
#[track_caller]
pub fn cancel(&self) {
self.canceled.try_set(true);
}