From fd048295a4981f84e4772421a8f927e4d8d61304 Mon Sep 17 00:00:00 2001 From: Greg Johnston Date: Sun, 12 May 2024 21:05:09 -0400 Subject: [PATCH] docs: full docs and doctests for Action/MultiAction --- reactive_graph/src/actions/action.rs | 453 +++++++++++++++++++-- reactive_graph/src/actions/multi_action.rs | 427 ++++++++++++++++++- 2 files changed, 831 insertions(+), 49 deletions(-) diff --git a/reactive_graph/src/actions/action.rs b/reactive_graph/src/actions/action.rs index 0a21c0e31..1e26e2050 100644 --- a/reactive_graph/src/actions/action.rs +++ b/reactive_graph/src/actions/action.rs @@ -1,5 +1,5 @@ use crate::{ - computed::ArcMemo, + computed::{ArcMemo, Memo}, diagnostics::is_suppressing_resource_load, owner::{Owner, StoredValue}, signal::{ArcRwSignal, RwSignal}, @@ -10,7 +10,7 @@ use any_spawner::Executor; use futures::{channel::oneshot, select, FutureExt}; use std::{future::Future, panic::Location, pin::Pin, sync::Arc}; -/// An action run some asynchronous code when you dispatch a new value to it, and gives you +/// An action runs some asynchronous code when you dispatch a new value to it, and gives you /// reactive access to the result. /// /// Actions are intended for mutating or updating data, not for loading data. If you find yourself @@ -86,6 +86,7 @@ use std::{future::Future, panic::Location, pin::Pin, sync::Arc}; /// /// // if there are multiple arguments, use a tuple /// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() }); +/// ``` pub struct ArcAction where I: 'static, @@ -126,7 +127,7 @@ where I: Send + Sync + 'static, O: Send + Sync + 'static, { - /// Creates a new action. Thi is lazy: it does not run the action function until some value + /// Creates a new action. This is lazy: it does not run the action function until some value /// is dispatched. /// /// The constructor takes a function which will create a new `Future` from some input data. @@ -185,71 +186,168 @@ where #[track_caller] pub fn dispatch(&self, input: I) { - //if !is_suppressing_resource_load() { - let mut fut = (self.action_fn)(&input).fuse(); + if !is_suppressing_resource_load() { + let mut fut = (self.action_fn)(&input).fuse(); - // abort this task if the owner is cleaned up - let (abort_tx, mut abort_rx) = oneshot::channel(); - Owner::on_cleanup(move || { - abort_tx.send(()).expect( - "tried to cancel a future in ArcAction::dispatch(), but the \ - channel has already closed", - ); - }); + // abort this task if the owner is cleaned up + let (abort_tx, mut abort_rx) = oneshot::channel(); + Owner::on_cleanup(move || { + abort_tx.send(()).expect( + "tried to cancel a future in ArcAction::dispatch(), but \ + the channel has already closed", + ); + }); - // Update the state before loading - self.in_flight.update(|n| *n += 1); - let current_version = - self.version.try_get_untracked().unwrap_or_default(); - self.input.try_update(|inp| *inp = Some(input)); + // Update the state before loading + self.in_flight.update(|n| *n += 1); + let current_version = + self.version.try_get_untracked().unwrap_or_default(); + self.input.try_update(|inp| *inp = Some(input)); - // Spawn the task - Executor::spawn({ - let input = self.input.clone(); - let version = self.version.clone(); - let value = self.value.clone(); - let in_flight = self.in_flight.clone(); - async move { - select! { - // if the abort message has been sent, bail and do nothing - _ = abort_rx => { - in_flight.update(|n| *n = n.saturating_sub(1)); - }, - // otherwise, update the value - result = fut => { - in_flight.update(|n| *n = n.saturating_sub(1)); - let is_latest = version.get_untracked() <= current_version; - if is_latest { - version.update(|n| *n += 1); - value.update(|n| *n = Some(result)); - } - if in_flight.get_untracked() == 0 { - input.update(|inp| *inp = None); + // Spawn the task + Executor::spawn({ + let input = self.input.clone(); + let version = self.version.clone(); + let value = self.value.clone(); + let in_flight = self.in_flight.clone(); + async move { + select! { + // if the abort message has been sent, bail and do nothing + _ = abort_rx => { + in_flight.update(|n| *n = n.saturating_sub(1)); + }, + // otherwise, update the value + result = fut => { + in_flight.update(|n| *n = n.saturating_sub(1)); + let is_latest = version.get_untracked() <= current_version; + if is_latest { + version.update(|n| *n += 1); + value.update(|n| *n = Some(result)); + } + if in_flight.get_untracked() == 0 { + input.update(|inp| *inp = None); + } } } } - } - }); - //} + }); + } } } impl ArcAction { + /// The number of times the action has successfully completed. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = ArcAction::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let version = act.version(); + /// act.dispatch(3); + /// assert_eq!(version.get(), 0); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(version.get(), 1); + /// # }); + /// ``` #[track_caller] pub fn version(&self) -> ArcRwSignal { self.version.clone() } + /// The current argument that was dispatched to the async function. This value will + /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = ArcAction::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let input = act.input(); + /// assert_eq!(input.get(), None); + /// act.dispatch(3); + /// assert_eq!(input.get(), Some(3)); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(input.get(), None); + /// # }); + /// ``` #[track_caller] pub fn input(&self) -> ArcRwSignal> { self.input.clone() } + /// The most recent return value of the `async` function. This will be `None` before + /// the action has ever run successfully, and subsequently will always be `Some(_)`, + /// holding the old value until a new value has been received. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = ArcAction::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let value = act.value(); + /// assert_eq!(value.get(), None); + /// act.dispatch(3); + /// assert_eq!(value.get(), None); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(value.get(), Some(6)); + /// // dispatch another value, and it still holds the old value + /// act.dispatch(3); + /// assert_eq!(value.get(), Some(6)); + /// # }); + /// ``` #[track_caller] pub fn value(&self) -> ArcRwSignal> { self.value.clone() } + /// Whether the action has been dispatched and is currently waiting to resolve. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = ArcAction::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let pending = act.pending(); + /// assert_eq!(pending.get(), false); + /// act.dispatch(3); + /// assert_eq!(pending.get(), true); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(pending.get(), false); + /// # }); + /// ``` #[track_caller] pub fn pending(&self) -> ArcMemo { let in_flight = self.in_flight.clone(); @@ -274,6 +372,83 @@ where } } +/// An action runs some asynchronous code when you dispatch a new value to it, and gives you +/// reactive access to the result. +/// +/// Actions are intended for mutating or updating data, not for loading data. If you find yourself +/// creating an action and immediately dispatching a value to it, this is probably the wrong +/// primitive. +/// +/// The reference-counted, `Clone` (but not `Copy` version of an `Action` is an [`ArcAction`]. +/// +/// ```rust +/// # use reactive_graph::actions::*; +/// # use reactive_graph::prelude::*; +/// # tokio_test::block_on(async move { +/// # any_spawner::Executor::init_tokio(); +/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); +/// async fn send_new_todo_to_api(task: String) -> usize { +/// // do something... +/// // return a task id +/// 42 +/// } +/// let save_data = Action::new(|task: &String| { +/// // `task` is given as `&String` because its value is available in `input` +/// send_new_todo_to_api(task.clone()) +/// }); +/// +/// // the argument currently running +/// let input = save_data.input(); +/// // the most recent returned result +/// let result_of_call = save_data.value(); +/// // whether the call is pending +/// let pending = save_data.pending(); +/// // how many times the action has run +/// // useful for reactively updating something else in response to a `dispatch` and response +/// let version = save_data.version(); +/// +/// // before we do anything +/// assert_eq!(input.get(), None); // no argument yet +/// assert_eq!(pending.get(), false); // isn't pending a response +/// assert_eq!(result_of_call.get(), None); // there's no "last value" +/// assert_eq!(version.get(), 0); +/// +/// // dispatch the action +/// save_data.dispatch("My todo".to_string()); +/// +/// // when we're making the call +/// assert_eq!(input.get(), Some("My todo".to_string())); +/// assert_eq!(pending.get(), true); // is pending +/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response +/// +/// # tokio::time::sleep(std::time::Duration::from_millis(25)).await; +/// +/// // after call has resolved +/// assert_eq!(input.get(), None); // input clears out after resolved +/// assert_eq!(pending.get(), false); // no longer pending +/// assert_eq!(result_of_call.get(), Some(42)); +/// assert_eq!(version.get(), 1); +/// # }); +/// ``` +/// +/// The input to the `async` function should always be a single value, +/// but it can be of any type. The argument is always passed by reference to the +/// function, because it is stored in [Action::input] as well. +/// +/// ```rust +/// # use reactive_graph::actions::*; +/// // if there's a single argument, just use that +/// let action1 = Action::new(|input: &String| { +/// let input = input.clone(); +/// async move { todo!() } +/// }); +/// +/// // if there are no arguments, use the unit type `()` +/// let action2 = Action::new(|input: &()| async { todo!() }); +/// +/// // if there are multiple arguments, use a tuple +/// let action3 = Action::new(|input: &(usize, String)| async { todo!() }); +/// ``` pub struct Action where I: 'static, @@ -295,6 +470,47 @@ where I: Send + Sync + 'static, O: Send + Sync + 'static, { + /// Creates a new action. This is lazy: it does not run the action function until some value + /// is dispatched. + /// + /// The constructor takes a function which will create a new `Future` from some input data. + /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will + /// be spawned. + /// + /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The + /// `Future` must be `Send` so that it can be moved across threads by the async executor as + /// needed. In order to be stored in the `Copy` arena, the input and output types should also + /// be `Send + Sync`. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = Action::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// act.dispatch(3); + /// assert_eq!(act.input().get(), Some(3)); + /// + /// // Remember that async functions already return a future if they are + /// // not `await`ed. You can save keystrokes by leaving out the `async move` + /// + /// let act2 = Action::new(|n: &String| yell(n.to_owned())); + /// act2.dispatch(String::from("i'm in a doctest")); + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// + /// // after it resolves + /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string())); + /// + /// async fn yell(n: String) -> String { + /// n.to_uppercase() + /// } + /// # }); + /// ``` #[track_caller] pub fn new(action_fn: F) -> Self where @@ -308,6 +524,28 @@ where } } + /// The number of times the action has successfully completed. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = Action::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let version = act.version(); + /// act.dispatch(3); + /// assert_eq!(version.get(), 0); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(version.get(), 1); + /// # }); + /// ``` #[track_caller] pub fn version(&self) -> RwSignal { let inner = self @@ -317,6 +555,30 @@ where inner.into() } + /// The current argument that was dispatched to the async function. This value will + /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = ArcAction::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let input = act.input(); + /// assert_eq!(input.get(), None); + /// act.dispatch(3); + /// assert_eq!(input.get(), Some(3)); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(input.get(), None); + /// # }); + /// ``` #[track_caller] pub fn input(&self) -> RwSignal> { let inner = self @@ -326,6 +588,34 @@ where inner.into() } + /// The most recent return value of the `async` function. This will be `None` before + /// the action has ever run successfully, and subsequently will always be `Some(_)`, + /// holding the old value until a new value has been received. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = Action::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let value = act.value(); + /// assert_eq!(value.get(), None); + /// act.dispatch(3); + /// assert_eq!(value.get(), None); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(value.get(), Some(6)); + /// // dispatch another value, and it still holds the old value + /// act.dispatch(3); + /// assert_eq!(value.get(), Some(6)); + /// # }); + /// ``` #[track_caller] pub fn value(&self) -> RwSignal> { let inner = self @@ -339,6 +629,38 @@ where pub fn dispatch(&self, input: I) { self.inner.with_value(|inner| inner.dispatch(input)); } + + /// Whether the action has been dispatched and is currently waiting to resolve. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// let act = Action::new(|n: &u8| { + /// let n = n.to_owned(); + /// async move { n * 2 } + /// }); + /// + /// let pending = act.pending(); + /// assert_eq!(pending.get(), false); + /// act.dispatch(3); + /// assert_eq!(pending.get(), true); + /// + /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; + /// // after it resolves + /// assert_eq!(pending.get(), false); + /// # }); + /// ``` + #[track_caller] + pub fn pending(&self) -> Memo { + let inner = self + .inner + .try_with_value(|inner| inner.pending()) + .unwrap_or_else(unwrap_signal!(self)); + inner.into() + } } impl DefinedAt for Action @@ -375,10 +697,51 @@ where { } +/// Creates a new action. This is lazy: it does not run the action function until some value +/// is dispatched. +/// +/// The constructor takes a function which will create a new `Future` from some input data. +/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will +/// be spawned. +/// +/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The +/// `Future` must be `Send` so that it can be moved across threads by the async executor as +/// needed. In order to be stored in the `Copy` arena, the input and output types should also +/// be `Send + Sync`. +/// +/// ```rust +/// # use reactive_graph::actions::*; +/// # use reactive_graph::prelude::*; +/// # tokio_test::block_on(async move { +/// # any_spawner::Executor::init_tokio(); +/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); +/// let act = create_action(|n: &u8| { +/// let n = n.to_owned(); +/// async move { n * 2 } +/// }); +/// +/// act.dispatch(3); +/// assert_eq!(act.input().get(), Some(3)); +/// +/// // Remember that async functions already return a future if they are +/// // not `await`ed. You can save keystrokes by leaving out the `async move` +/// +/// let act2 = Action::new(|n: &String| yell(n.to_owned())); +/// act2.dispatch(String::from("i'm in a doctest")); +/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await; +/// +/// // after it resolves +/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string())); +/// +/// async fn yell(n: String) -> String { +/// n.to_uppercase() +/// } +/// # }); +/// ``` #[inline(always)] #[track_caller] -#[deprecated = "This function is being removed to conform to Rust \ - idioms.Please use `Action::new()` instead."] +#[deprecated = "This function is being removed to conform to Rust idioms. \ + Please use `Action::new()` instead."] pub fn create_action(action_fn: F) -> Action where I: Send + Sync + 'static, diff --git a/reactive_graph/src/actions/multi_action.rs b/reactive_graph/src/actions/multi_action.rs index 5abf8f56c..2367ea004 100644 --- a/reactive_graph/src/actions/multi_action.rs +++ b/reactive_graph/src/actions/multi_action.rs @@ -8,6 +8,43 @@ use crate::{ use any_spawner::Executor; use std::{fmt::Debug, future::Future, panic::Location, pin::Pin, sync::Arc}; +/// An action that synchronizes multiple imperative `async` calls to the reactive system, +/// tracking the progress of each one. +/// +/// Where an [`Action`](super::Action) fires a single call, a `MultiAction` allows you to +/// keep track of multiple in-flight actions. +/// +/// If you’re trying to load data by running an `async` function reactively, you probably +/// want to use an [`AsyncDerived`](crate::computed::AsyncDerived) instead. +/// If you’re trying to occasionally run an `async` function in response to something +/// like a user adding a task to a todo list, you’re in the right place. +/// +/// The reference-counted, `Clone` (but not `Copy` version of a `MultiAction` is an [`ArcMultiAction`]. +/// +/// ```rust +/// # use reactive_graph::actions::*; +/// # use reactive_graph::prelude::*; +/// # tokio_test::block_on(async move { +/// # any_spawner::Executor::init_tokio(); +/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); +/// async fn send_new_todo_to_api(task: String) -> usize { +/// // do something... +/// // return a task id +/// 42 +/// } +/// let add_todo = MultiAction::new(|task: &String| { +/// // `task` is given as `&String` because its value is available in `input` +/// send_new_todo_to_api(task.clone()) +/// }); +/// +/// add_todo.dispatch("Buy milk".to_string()); +/// add_todo.dispatch("???".to_string()); +/// add_todo.dispatch("Profit!!!".to_string()); +/// +/// let submissions = add_todo.submissions(); +/// assert_eq!(submissions.with(Vec::len), 3); +/// # }); +/// ``` pub struct MultiAction where I: 'static, @@ -63,6 +100,32 @@ where I: Send + Sync + 'static, O: Send + Sync + 'static, { + /// Creates a new multi-action. + /// + /// The input to the `async` function should always be a single value, + /// but it can be of any type. The argument is always passed by reference to the + /// function, because it is stored in [Submission::input] as well. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// // if there's a single argument, just use that + /// let action1 = MultiAction::new(|input: &String| { + /// let input = input.clone(); + /// async move { todo!() } + /// }); + /// + /// // if there are no arguments, use the unit type `()` + /// let action2 = MultiAction::new(|input: &()| async { todo!() }); + /// + /// // if there are multiple arguments, use a tuple + /// let action3 = + /// MultiAction::new(|input: &(usize, String)| async { todo!() }); + /// # }); + /// ``` #[track_caller] pub fn new( action_fn: impl Fn(&I) -> Fut + Send + Sync + 'static, @@ -78,6 +141,48 @@ where } /// Calls the `async` function with a reference to the input type as its argument. + /// + /// This can be called any number of times: each submission will be dispatched, running + /// concurrently, and its status can be checked via the + /// [`submissions()`](MultiAction::submissions) signal. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = MultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let submissions = add_todo.submissions(); + /// let pending_submissions = move || { + /// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count()) + /// }; + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// assert_eq!(submissions.with(Vec::len), 1); + /// assert_eq!(pending_submissions(), 1); + /// + /// add_todo.dispatch("???".to_string()); + /// add_todo.dispatch("Profit!!!".to_string()); + /// + /// assert_eq!(submissions.with(Vec::len), 3); + /// assert_eq!(pending_submissions(), 3); + /// + /// // when submissions resolve, they are not removed from the set + /// // however, their `pending` signal is now `false`, and this can be used to filter them + /// # tokio::time::sleep(std::time::Duration::from_millis(100)).await; + /// assert_eq!(submissions.with(Vec::len), 3); + /// assert_eq!(pending_submissions(), 0); + /// # }); + /// ``` pub fn dispatch(&self, input: I) { if !is_suppressing_resource_load() { self.inner.with_value(|inner| inner.dispatch(input)); @@ -86,13 +191,72 @@ where /// Synchronously adds a submission with the given value. /// + /// This takes the output value, rather than the input, because it is adding a result, not an + /// input. + /// /// This can be useful for use cases like handling errors, where the error can already be known /// on the client side. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = MultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let submissions = add_todo.submissions(); + /// let pending_submissions = move || { + /// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count()) + /// }; + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// assert_eq!(submissions.with(Vec::len), 1); + /// assert_eq!(pending_submissions(), 1); + /// + /// add_todo.dispatch_sync(42); + /// + /// assert_eq!(submissions.with(Vec::len), 2); + /// assert_eq!(pending_submissions(), 1); + /// # }); + /// ``` pub fn dispatch_sync(&self, value: O) { self.inner.with_value(|inner| inner.dispatch_sync(value)); } /// The set of all submissions to this multi-action. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = MultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let submissions = add_todo.submissions(); + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// add_todo.dispatch("???".to_string()); + /// add_todo.dispatch("Profit!!!".to_string()); + /// + /// assert_eq!(submissions.with(Vec::len), 3); + /// # }); + /// ``` pub fn submissions(&self) -> ReadSignal>> { self.inner .try_with_value(|inner| inner.submissions()) @@ -101,6 +265,35 @@ where } /// How many times an action has successfully resolved. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = MultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let version = add_todo.version(); + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// add_todo.dispatch("???".to_string()); + /// add_todo.dispatch("Profit!!!".to_string()); + /// + /// assert_eq!(version.get(), 0); + /// # tokio::time::sleep(std::time::Duration::from_millis(100)).await; + /// + /// // when they've all resolved + /// assert_eq!(version.get(), 3); + /// # }); + /// ``` pub fn version(&self) -> RwSignal { self.inner .try_with_value(|inner| inner.version()) @@ -109,6 +302,43 @@ where } } +/// An action that synchronizes multiple imperative `async` calls to the reactive system, +/// tracking the progress of each one. +/// +/// Where an [`Action`](super::Action) fires a single call, a `MultiAction` allows you to +/// keep track of multiple in-flight actions. +/// +/// If you’re trying to load data by running an `async` function reactively, you probably +/// want to use an [`AsyncDerived`](crate::computed::AsyncDerived) instead. +/// If you’re trying to occasionally run an `async` function in response to something +/// like a user adding a task to a todo list, you’re in the right place. +/// +/// The arena-allocated, `Copy` version of an `ArcMultiAction` is a [`MultiAction`]. +/// +/// ```rust +/// # use reactive_graph::actions::*; +/// # use reactive_graph::prelude::*; +/// # tokio_test::block_on(async move { +/// # any_spawner::Executor::init_tokio(); +/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); +/// async fn send_new_todo_to_api(task: String) -> usize { +/// // do something... +/// // return a task id +/// 42 +/// } +/// let add_todo = ArcMultiAction::new(|task: &String| { +/// // `task` is given as `&String` because its value is available in `input` +/// send_new_todo_to_api(task.clone()) +/// }); +/// +/// add_todo.dispatch("Buy milk".to_string()); +/// add_todo.dispatch("???".to_string()); +/// add_todo.dispatch("Profit!!!".to_string()); +/// +/// let submissions = add_todo.submissions(); +/// assert_eq!(submissions.with(Vec::len), 3); +/// # }); +/// ``` pub struct ArcMultiAction where I: 'static, @@ -151,9 +381,36 @@ where impl ArcMultiAction where - I: 'static, - O: 'static, + I: Send + Sync + 'static, + O: Send + Sync + 'static, { + /// Creates a new multi-action. + /// + /// The input to the `async` function should always be a single value, + /// but it can be of any type. The argument is always passed by reference to the + /// function, because it is stored in [Submission::input] as well. + /// + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// // if there's a single argument, just use that + /// let action1 = ArcMultiAction::new(|input: &String| { + /// let input = input.clone(); + /// async move { todo!() } + /// }); + /// + /// // if there are no arguments, use the unit type `()` + /// let action2 = ArcMultiAction::new(|input: &()| async { todo!() }); + /// + /// // if there are multiple arguments, use a tuple + /// let action3 = + /// ArcMultiAction::new(|input: &(usize, String)| async { todo!() }); + /// # }); + /// ``` + #[track_caller] pub fn new( action_fn: impl Fn(&I) -> Fut + Send + Sync + 'static, ) -> Self @@ -172,6 +429,51 @@ where } /// Calls the `async` function with a reference to the input type as its argument. + /// + /// This can be called any number of times: each submission will be dispatched, running + /// concurrently, and its status can be checked via the + /// [`submissions()`](MultiAction::submissions) signal. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = ArcMultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let submissions = add_todo.submissions(); + /// let pending_submissions = { + /// let submissions = submissions.clone(); + /// move || { + /// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count()) + /// } + /// }; + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// assert_eq!(submissions.with(Vec::len), 1); + /// assert_eq!(pending_submissions(), 1); + /// + /// add_todo.dispatch("???".to_string()); + /// add_todo.dispatch("Profit!!!".to_string()); + /// + /// assert_eq!(submissions.with(Vec::len), 3); + /// assert_eq!(pending_submissions(), 3); + /// + /// // when submissions resolve, they are not removed from the set + /// // however, their `pending` signal is now `false`, and this can be used to filter them + /// # tokio::time::sleep(std::time::Duration::from_millis(100)).await; + /// assert_eq!(submissions.with(Vec::len), 3); + /// assert_eq!(pending_submissions(), 0); + /// # }); + /// ``` pub fn dispatch(&self, input: I) { if !is_suppressing_resource_load() { let fut = (self.action_fn)(&input); @@ -188,7 +490,7 @@ where let version = self.version.clone(); - Executor::spawn_local(async move { + Executor::spawn(async move { let new_value = fut.await; let canceled = submission.canceled.get_untracked(); if !canceled { @@ -203,13 +505,50 @@ where /// Synchronously adds a submission with the given value. /// + /// This takes the output value, rather than the input, because it is adding a result, not an + /// input. + /// /// This can be useful for use cases like handling errors, where the error can already be known /// on the client side. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = ArcMultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let submissions = add_todo.submissions(); + /// let pending_submissions = { + /// let submissions = submissions.clone(); + /// move || { + /// submissions.with(|subs| subs.iter().filter(|sub| sub.pending().get()).count()) + /// } + /// }; + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// assert_eq!(submissions.with(Vec::len), 1); + /// assert_eq!(pending_submissions(), 1); + /// + /// add_todo.dispatch_sync(42); + /// + /// assert_eq!(submissions.with(Vec::len), 2); + /// assert_eq!(pending_submissions(), 1); + /// # }); + /// ``` pub fn dispatch_sync(&self, value: O) { let submission = ArcSubmission { input: ArcRwSignal::new(None), value: ArcRwSignal::new(Some(value)), - pending: ArcRwSignal::new(true), + pending: ArcRwSignal::new(false), canceled: ArcRwSignal::new(false), }; @@ -219,11 +558,65 @@ where } /// The set of all submissions to this multi-action. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = ArcMultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let submissions = add_todo.submissions(); + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// add_todo.dispatch("???".to_string()); + /// add_todo.dispatch("Profit!!!".to_string()); + /// + /// assert_eq!(submissions.with(Vec::len), 3); + /// # }); + /// ``` pub fn submissions(&self) -> ArcReadSignal>> { self.submissions.read_only() } /// How many times an action has successfully resolved. + /// ```rust + /// # use reactive_graph::actions::*; + /// # use reactive_graph::prelude::*; + /// # tokio_test::block_on(async move { + /// # any_spawner::Executor::init_tokio(); + /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter(); + /// async fn send_new_todo_to_api(task: String) -> usize { + /// // do something... + /// // return a task id + /// 42 + /// } + /// let add_todo = ArcMultiAction::new(|task: &String| { + /// // `task` is given as `&String` because its value is available in `input` + /// send_new_todo_to_api(task.clone()) + /// }); + /// + /// let version = add_todo.version(); + /// + /// add_todo.dispatch("Buy milk".to_string()); + /// add_todo.dispatch("???".to_string()); + /// add_todo.dispatch("Profit!!!".to_string()); + /// + /// assert_eq!(version.get(), 0); + /// # tokio::time::sleep(std::time::Duration::from_millis(100)).await; + /// + /// // when they've all resolved + /// assert_eq!(version.get(), 3); + /// # }); + /// ``` pub fn version(&self) -> ArcRwSignal { self.version.clone() } @@ -251,23 +644,37 @@ where I: 'static, O: 'static, { + /// The current argument that was dispatched to the `async` function. + /// `Some` while we are waiting for it to resolve, `None` if it has resolved. + #[track_caller] pub fn input(&self) -> ArcReadSignal> { self.input.read_only() } + /// The most recent return value of the `async` function. + #[track_caller] pub fn value(&self) -> ArcReadSignal> { self.value.read_only() } + /// Whether this submision is still waiting to resolve. + #[track_caller] pub fn pending(&self) -> ArcReadSignal { self.pending.read_only() } + /// Whether this submission has been canceled. + #[track_caller] pub fn canceled(&self) -> ArcReadSignal { self.canceled.read_only() } + /// Cancels the submission. This will not necessarily prevent the `Future` + /// from continuing to run, but it will update the returned value. + #[track_caller] pub fn cancel(&self) { + // TODO if we set these up to race against a cancel signal, we could actually drop the + // futures self.canceled.try_set(true); } } @@ -325,22 +732,34 @@ where I: Send + Sync + 'static, O: Send + Sync + 'static, { + /// The current argument that was dispatched to the `async` function. + /// `Some` while we are waiting for it to resolve, `None` if it has resolved. + #[track_caller] pub fn input(&self) -> ReadSignal> { self.input.read_only() } + /// The most recent return value of the `async` function. + #[track_caller] pub fn value(&self) -> ReadSignal> { self.value.read_only() } + /// Whether this submision is still waiting to resolve. + #[track_caller] pub fn pending(&self) -> ReadSignal { self.pending.read_only() } + /// Whether this submission has been canceled. + #[track_caller] pub fn canceled(&self) -> ReadSignal { self.canceled.read_only() } + /// Cancels the submission. This will not necessarily prevent the `Future` + /// from continuing to run, but it will update the returned value. + #[track_caller] pub fn cancel(&self) { self.canceled.try_set(true); }