fix: set `None` observer properly in `ScopedFuture`
This commit is contained in:
parent
d24f97b59f
commit
a2385e4c42
|
@ -9,7 +9,7 @@ use crate::{
|
||||||
diagnostics::SpecialNonReactiveFuture,
|
diagnostics::SpecialNonReactiveFuture,
|
||||||
graph::{
|
graph::{
|
||||||
AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
|
AnySource, AnySubscriber, ReactiveNode, Source, SourceSet, Subscriber,
|
||||||
SubscriberSet, ToAnySource, ToAnySubscriber,
|
SubscriberSet, ToAnySource, ToAnySubscriber, WithObserver,
|
||||||
},
|
},
|
||||||
owner::{use_context, Owner},
|
owner::{use_context, Owner},
|
||||||
signal::guards::{AsyncPlain, ReadGuard},
|
signal::guards::{AsyncPlain, ReadGuard},
|
||||||
|
|
|
@ -5,7 +5,7 @@ mod async_derived;
|
||||||
mod future_impls;
|
mod future_impls;
|
||||||
mod inner;
|
mod inner;
|
||||||
use crate::{
|
use crate::{
|
||||||
graph::{AnySubscriber, Observer},
|
graph::{AnySubscriber, Observer, WithObserver},
|
||||||
owner::Owner,
|
owner::Owner,
|
||||||
};
|
};
|
||||||
pub use async_derived::*;
|
pub use async_derived::*;
|
||||||
|
@ -23,7 +23,7 @@ pin_project! {
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
#[allow(missing_docs)]
|
#[allow(missing_docs)]
|
||||||
pub struct ScopedFuture<Fut> {
|
pub struct ScopedFuture<Fut> {
|
||||||
pub owner: Option<Owner>,
|
pub owner: Owner,
|
||||||
pub observer: Option<AnySubscriber>,
|
pub observer: Option<AnySubscriber>,
|
||||||
#[pin]
|
#[pin]
|
||||||
pub fut: Fut,
|
pub fut: Fut,
|
||||||
|
@ -34,7 +34,7 @@ impl<Fut> ScopedFuture<Fut> {
|
||||||
/// Wraps the given `Future` by taking the current [`Owner`] and [`Observer`] and re-setting
|
/// Wraps the given `Future` by taking the current [`Owner`] and [`Observer`] and re-setting
|
||||||
/// them as the active owner and observer every time the inner `Future` is polled.
|
/// them as the active owner and observer every time the inner `Future` is polled.
|
||||||
pub fn new(fut: Fut) -> Self {
|
pub fn new(fut: Fut) -> Self {
|
||||||
let owner = Owner::current();
|
let owner = Owner::current().unwrap_or_default();
|
||||||
let observer = Observer::get();
|
let observer = Observer::get();
|
||||||
Self {
|
Self {
|
||||||
owner,
|
owner,
|
||||||
|
@ -49,14 +49,8 @@ impl<Fut: Future> Future for ScopedFuture<Fut> {
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.project();
|
let this = self.project();
|
||||||
match (this.owner, this.observer) {
|
this.owner
|
||||||
(None, None) => this.fut.poll(cx),
|
.with(|| this.observer.with_observer(|| this.fut.poll(cx)))
|
||||||
(None, Some(obs)) => obs.with_observer(|| this.fut.poll(cx)),
|
|
||||||
(Some(owner), None) => owner.with(|| this.fut.poll(cx)),
|
|
||||||
(Some(owner), Some(observer)) => {
|
|
||||||
owner.with(|| observer.with_observer(|| this.fut.poll(cx)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
graph::{
|
graph::{
|
||||||
AnySource, AnySubscriber, Observer, ReactiveNode, ReactiveNodeState,
|
AnySource, AnySubscriber, Observer, ReactiveNode, ReactiveNodeState,
|
||||||
Source, SourceSet, Subscriber, SubscriberSet,
|
Source, SourceSet, Subscriber, SubscriberSet, WithObserver,
|
||||||
},
|
},
|
||||||
owner::Owner,
|
owner::Owner,
|
||||||
};
|
};
|
||||||
|
|
|
@ -3,6 +3,7 @@ use crate::{
|
||||||
effect::inner::EffectInner,
|
effect::inner::EffectInner,
|
||||||
graph::{
|
graph::{
|
||||||
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
|
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
|
||||||
|
WithObserver,
|
||||||
},
|
},
|
||||||
owner::{Owner, StoredValue},
|
owner::{Owner, StoredValue},
|
||||||
traits::Dispose,
|
traits::Dispose,
|
||||||
|
|
|
@ -3,6 +3,7 @@ use crate::{
|
||||||
effect::inner::EffectInner,
|
effect::inner::EffectInner,
|
||||||
graph::{
|
graph::{
|
||||||
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
|
AnySubscriber, ReactiveNode, SourceSet, Subscriber, ToAnySubscriber,
|
||||||
|
WithObserver,
|
||||||
},
|
},
|
||||||
owner::Owner,
|
owner::Owner,
|
||||||
};
|
};
|
||||||
|
|
|
@ -13,6 +13,7 @@ thread_local! {
|
||||||
/// subscribe to changes in any signals that are read.
|
/// subscribe to changes in any signals that are read.
|
||||||
pub struct Observer;
|
pub struct Observer;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
struct SetObserverOnDrop(Option<AnySubscriber>);
|
struct SetObserverOnDrop(Option<AnySubscriber>);
|
||||||
|
|
||||||
impl Drop for SetObserverOnDrop {
|
impl Drop for SetObserverOnDrop {
|
||||||
|
@ -39,10 +40,9 @@ impl Observer {
|
||||||
OBSERVER.with_borrow_mut(|o| *o = observer);
|
OBSERVER.with_borrow_mut(|o| *o = observer);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn replace(observer: AnySubscriber) -> SetObserverOnDrop {
|
fn replace(observer: Option<AnySubscriber>) -> SetObserverOnDrop {
|
||||||
SetObserverOnDrop(
|
SetObserverOnDrop(
|
||||||
OBSERVER
|
OBSERVER.with(|o| mem::replace(&mut *o.borrow_mut(), observer)),
|
||||||
.with(|o| mem::replace(&mut *o.borrow_mut(), Some(observer))),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -75,6 +75,7 @@ impl Observer {
|
||||||
/// assert_eq!(c.get(), 3);
|
/// assert_eq!(c.get(), 3);
|
||||||
/// # });
|
/// # });
|
||||||
/// ```
|
/// ```
|
||||||
|
#[track_caller]
|
||||||
pub fn untrack<T>(fun: impl FnOnce() -> T) -> T {
|
pub fn untrack<T>(fun: impl FnOnce() -> T) -> T {
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
let _warning_guard = crate::diagnostics::SpecialNonReactiveZone::enter();
|
let _warning_guard = crate::diagnostics::SpecialNonReactiveZone::enter();
|
||||||
|
@ -150,9 +151,23 @@ impl ReactiveNode for AnySubscriber {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AnySubscriber {
|
/// Runs code with some subscriber as the thread-local [`Observer`].
|
||||||
|
pub trait WithObserver {
|
||||||
/// Runs the given function with this subscriber as the thread-local [`Observer`].
|
/// Runs the given function with this subscriber as the thread-local [`Observer`].
|
||||||
pub fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
|
fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WithObserver for AnySubscriber {
|
||||||
|
/// Runs the given function with this subscriber as the thread-local [`Observer`].
|
||||||
|
fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
|
||||||
|
let _prev = Observer::replace(Some(self.clone()));
|
||||||
|
fun()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WithObserver for Option<AnySubscriber> {
|
||||||
|
/// Runs the given function with this subscriber as the thread-local [`Observer`].
|
||||||
|
fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
|
||||||
let _prev = Observer::replace(self.clone());
|
let _prev = Observer::replace(self.clone());
|
||||||
fun()
|
fun()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue