This commit is contained in:
parent
8c959d3e24
commit
84457b45ff
|
@ -83,7 +83,8 @@ where
|
||||||
move || fetcher(source.get())
|
move || fetcher(source.get())
|
||||||
};
|
};
|
||||||
|
|
||||||
let data = ArcAsyncDerived::new_with_initial(initial, fun);
|
let data =
|
||||||
|
ArcAsyncDerived::new_with_initial_without_spawning(initial, fun);
|
||||||
if is_ready {
|
if is_ready {
|
||||||
source.with(|_| ());
|
source.with(|_| ());
|
||||||
source.add_subscriber(data.to_any_subscriber());
|
source.add_subscriber(data.to_any_subscriber());
|
||||||
|
|
|
@ -195,10 +195,10 @@ impl<T> DefinedAt for ArcAsyncDerived<T> {
|
||||||
// whether `fun` returns a `Future` that is `Send`. Doing it as a function would,
|
// whether `fun` returns a `Future` that is `Send`. Doing it as a function would,
|
||||||
// as far as I can tell, require repeating most of the function body.
|
// as far as I can tell, require repeating most of the function body.
|
||||||
macro_rules! spawn_derived {
|
macro_rules! spawn_derived {
|
||||||
($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal) => {{
|
($spawner:expr, $initial:ident, $fun:ident, $should_spawn:literal, $force_spawn:literal) => {{
|
||||||
let (notifier, mut rx) = channel();
|
let (notifier, mut rx) = channel();
|
||||||
|
|
||||||
let is_ready = $initial.is_some();
|
let is_ready = $initial.is_some() && !$force_spawn;
|
||||||
|
|
||||||
let owner = Owner::new();
|
let owner = Owner::new();
|
||||||
let inner = Arc::new(RwLock::new(ArcAsyncDerivedInner {
|
let inner = Arc::new(RwLock::new(ArcAsyncDerivedInner {
|
||||||
|
@ -344,9 +344,8 @@ impl<T: 'static> ArcAsyncDerived<T> {
|
||||||
Self::new_with_initial(None, fun)
|
Self::new_with_initial(None, fun)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new async derived computation with an initial value.
|
/// Creates a new async derived computation with an initial value as a fallback, and begins running the
|
||||||
///
|
/// `Future` eagerly to get the actual first value.
|
||||||
/// If the initial value is `Some(_)`, the task will not be run initially.
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
pub fn new_with_initial<Fut>(
|
pub fn new_with_initial<Fut>(
|
||||||
initial_value: Option<T>,
|
initial_value: Option<T>,
|
||||||
|
@ -357,7 +356,27 @@ impl<T: 'static> ArcAsyncDerived<T> {
|
||||||
Fut: Future<Output = T> + Send + 'static,
|
Fut: Future<Output = T> + Send + 'static,
|
||||||
{
|
{
|
||||||
let (this, _) =
|
let (this, _) =
|
||||||
spawn_derived!(Executor::spawn, initial_value, fun, true);
|
spawn_derived!(Executor::spawn, initial_value, fun, true, true);
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new async derived computation with an initial value, and does not spawn a task
|
||||||
|
/// initially.
|
||||||
|
///
|
||||||
|
/// This is mostly used with manual dependency tracking, for primitives built on top of this
|
||||||
|
/// where you do not want to run the run the `Future` unnecessarily.
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[track_caller]
|
||||||
|
pub fn new_with_initial_without_spawning<Fut>(
|
||||||
|
initial_value: Option<T>,
|
||||||
|
fun: impl Fn() -> Fut + Send + Sync + 'static,
|
||||||
|
) -> Self
|
||||||
|
where
|
||||||
|
T: Send + Sync + 'static,
|
||||||
|
Fut: Future<Output = T> + Send + 'static,
|
||||||
|
{
|
||||||
|
let (this, _) =
|
||||||
|
spawn_derived!(Executor::spawn, initial_value, fun, true, false);
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,10 +394,8 @@ impl<T: 'static> ArcAsyncDerived<T> {
|
||||||
Self::new_unsync_with_initial(None, fun)
|
Self::new_unsync_with_initial(None, fun)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new async derived computation with an initial value. Async work will be
|
/// Creates a new async derived computation with an initial value as a fallback, and begins running the
|
||||||
/// guaranteed to run only on the current thread.
|
/// `Future` eagerly to get the actual first value.
|
||||||
///
|
|
||||||
/// If the initial value is `Some(_)`, the task will not be run initially.
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
pub fn new_unsync_with_initial<Fut>(
|
pub fn new_unsync_with_initial<Fut>(
|
||||||
initial_value: Option<T>,
|
initial_value: Option<T>,
|
||||||
|
@ -388,8 +405,13 @@ impl<T: 'static> ArcAsyncDerived<T> {
|
||||||
T: 'static,
|
T: 'static,
|
||||||
Fut: Future<Output = T> + 'static,
|
Fut: Future<Output = T> + 'static,
|
||||||
{
|
{
|
||||||
let (this, _) =
|
let (this, _) = spawn_derived!(
|
||||||
spawn_derived!(Executor::spawn_local, initial_value, fun, true);
|
Executor::spawn_local,
|
||||||
|
initial_value,
|
||||||
|
fun,
|
||||||
|
true,
|
||||||
|
true
|
||||||
|
);
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,7 +424,7 @@ impl<T: 'static> ArcAsyncDerived<T> {
|
||||||
{
|
{
|
||||||
let initial = None::<T>;
|
let initial = None::<T>;
|
||||||
let (this, _) =
|
let (this, _) =
|
||||||
spawn_derived!(Executor::spawn_local, initial, fun, false);
|
spawn_derived!(Executor::spawn_local, initial, fun, false, false);
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -90,3 +90,35 @@ async fn read_signal_traits_on_arena() {
|
||||||
assert_eq!(value.with(|n| *n), None);
|
assert_eq!(value.with(|n| *n), None);
|
||||||
assert_eq!(value.get(), None);
|
assert_eq!(value.get(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn async_derived_with_initial() {
|
||||||
|
_ = Executor::init_tokio();
|
||||||
|
|
||||||
|
let signal1 = RwSignal::new(0);
|
||||||
|
let signal2 = RwSignal::new(0);
|
||||||
|
let derived =
|
||||||
|
ArcAsyncDerived::new_with_initial(Some(5), move || async move {
|
||||||
|
// reactive values can be tracked anywhere in the `async` block
|
||||||
|
let value1 = signal1.get();
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
|
||||||
|
let value2 = signal2.get();
|
||||||
|
|
||||||
|
value1 + value2
|
||||||
|
});
|
||||||
|
|
||||||
|
// the value can be accessed synchronously as `Option<T>`
|
||||||
|
assert_eq!(derived.get(), Some(5));
|
||||||
|
// we can also .await the value, i.e., convert it into a Future
|
||||||
|
assert_eq!(derived.clone().await, 0);
|
||||||
|
assert_eq!(derived.get(), Some(0));
|
||||||
|
|
||||||
|
signal1.set(1);
|
||||||
|
// while the new value is still pending, the signal holds the old value
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||||||
|
assert_eq!(derived.get(), Some(0));
|
||||||
|
|
||||||
|
// setting multiple dependencies will hold until the latest change is ready
|
||||||
|
signal2.set(1);
|
||||||
|
assert_eq!(derived.await, 2);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue