feat: return an async guard from .await rather than cloning the value every time

This commit is contained in:
Greg Johnston 2024-05-31 11:39:32 -04:00
parent 6003212f6e
commit 846ff2fefb
12 changed files with 209 additions and 91 deletions

View File

@ -87,17 +87,19 @@ pub fn fetch_example() -> impl IntoView {
<ul>
{move || Suspend(async move {
cats.await
.as_ref()
.map(|cats| {
cats.into_iter()
cats.iter()
.map(|s| {
view! {
<li>
<img src=s/>
<img src=s.clone()/>
</li>
}
})
.collect::<Vec<_>>()
})
.map_err(Clone::clone)
})}
</ul>

View File

@ -30,7 +30,6 @@ pub fn Stories() -> impl IntoView {
params
.read()
.get("stories")
.map(ToOwned::to_owned)
.unwrap_or_else(|| "top".to_string())
};
let stories = Resource::new_serde(
@ -44,7 +43,12 @@ pub fn Stories() -> impl IntoView {
let hide_more_link = move || {
Suspend(async move {
stories.await.unwrap_or_default().len() < 28 || pending.get()
stories
.await
.as_ref()
.map(|vec| vec.len() < 28)
.unwrap_or(true)
|| pending.get()
})
};
@ -91,16 +95,19 @@ pub fn Stories() -> impl IntoView {
// TODO set_pending on Transition
//set_pending
>
{move || Suspend(async move { match stories.await {
None => Either::Left(view! { <p>"Error loading stories."</p> }),
Some(stories) => {
Either::Right(view! {
<ul>
{stories.into_iter().map(|story| view! { <Story story/> }).collect::<Vec<_>>()}
</ul>
})
}
}})}
<Show when=move || stories.read().as_ref().map(Option::is_none).unwrap_or(false)>
>
<p>"Error loading stories."</p>
</Show>
<ul>
<For
each=move || stories.get().unwrap_or_default().unwrap_or_default()
key=|story| story.id
let:story
>
<Story story/>
</For>
</ul>
</Transition>
</div>
</main>

View File

@ -9,13 +9,7 @@ use leptos_router::hooks::use_params_map;
pub fn Story() -> impl IntoView {
let params = use_params_map();
let story = Resource::new_serde(
move || {
params
.get()
.get("id")
.map(ToOwned::to_owned)
.unwrap_or_default()
},
move || params.read().get("id").unwrap_or_default(),
move |id| async move {
if id.is_empty() {
None
@ -27,7 +21,7 @@ pub fn Story() -> impl IntoView {
);
Suspense(SuspenseProps::builder().fallback(|| "Loading...").children(ToChildren::to_children(move || Suspend(async move {
match story.await {
match story.await.clone() {
None => Either::Left("Story not found."),
Some(story) => {
Either::Right(view! {

View File

@ -7,13 +7,7 @@ use leptos_router::{hooks::use_params_map, *};
pub fn User() -> impl IntoView {
let params = use_params_map();
let user = Resource::new_serde(
move || {
params
.read()
.get("id")
.map(ToOwned::to_owned)
.unwrap_or_default()
},
move || params.read().get("id").unwrap_or_default(),
move |id| async move {
if id.is_empty() {
None
@ -25,7 +19,7 @@ pub fn User() -> impl IntoView {
view! {
<div class="user-view">
<Suspense fallback=|| view! { "Loading..." }>
{move || Suspend(async move { match user.await {
{move || Suspend(async move { match user.await.clone() {
None => Either::Left(view! { <h1>"User not found."</h1> }),
Some(user) => Either::Right(view! {
<div>

View File

@ -60,7 +60,7 @@ fn HomePage() -> impl IntoView {
let posts2 = Resource::new_serde(|| (), |_| list_post_metadata());
let posts2 = Resource::new(
|| (),
move |_| async move { posts2.await.unwrap_or_default().len() },
move |_| async move { posts2.await.as_ref().map(Vec::len).unwrap_or(0) },
);
/*let posts_view = Suspend(async move {
@ -80,7 +80,7 @@ fn HomePage() -> impl IntoView {
view! {
<h1>"My Great Blog"</h1>
<Suspense fallback=move || view! { <p>"Loading posts..."</p> }>
<p>"number of posts: " {Suspend(posts2.into_future())}</p>
<p>"number of posts: " {Suspend(async move { *posts2.await })}</p>
</Suspense>
<Suspense fallback=move || view! { <p>"Loading posts..."</p> }>
//<ul>{posts_view}</ul>
@ -119,7 +119,7 @@ fn Post() -> impl IntoView {
});
let post_view = Suspend(async move {
match post_resource.await {
match post_resource.await.to_owned() {
Ok(Ok(post)) => Ok(view! {
<h1>{post.title.clone()}</h1>
<p>{post.content.clone()}</p>
@ -204,7 +204,7 @@ pub struct PostMetadata {
#[server]
pub async fn list_post_metadata() -> Result<Vec<PostMetadata>, ServerFnError> {
//tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(POSTS
.iter()
.map(|data| PostMetadata {
@ -216,6 +216,6 @@ pub async fn list_post_metadata() -> Result<Vec<PostMetadata>, ServerFnError> {
#[server]
pub async fn get_post(id: usize) -> Result<Option<Post>, ServerFnError> {
//tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(POSTS.iter().find(|post| post.id == id).cloned())
}

View File

@ -117,6 +117,38 @@ pub fn Todos() -> impl IntoView {
move |_| get_todos(),
);
let existing_todos = move || {
Suspend(async move {
todos
.await
.as_ref()
.map(|todos| {
if todos.is_empty() {
Either::Left(view! { <p>"No tasks were found."</p> })
} else {
Either::Right(
todos
.iter()
.map(move |todo| {
let id = todo.id;
view! {
<li>
{todo.title.clone()}
<ActionForm action=delete_todo>
<input type="hidden" name="id" value=id/>
<input type="submit" value="X"/>
</ActionForm>
</li>
}
})
.collect::<Vec<_>>(),
)
}
})
.map_err(Clone::clone)
})
};
view! {
<MultiActionForm action=add_todo>
<label>"Add a Todo" <input type="text" name="title"/></label>
@ -125,36 +157,8 @@ pub fn Todos() -> impl IntoView {
<div>
<Transition fallback=move || view! { <p>"Loading..."</p> }>
<ErrorBoundary fallback=|errors| view! { <ErrorTemplate errors/> }>
// {existing_todos}
<ul>
{move || {
async move {
todos
.await
.map(|todos| {
if todos.is_empty() {
Either::Left(view! { <p>"No tasks were found."</p> })
} else {
Either::Right(
todos
.into_iter()
.map(move |todo| {
view! {
<li>
{todo.title} <ActionForm action=delete_todo>
<input type="hidden" name="id" value=todo.id/>
<input type="submit" value="X"/>
</ActionForm>
</li>
}
})
.collect::<Vec<_>>(),
)
}
})
}
.wait()
}}
{existing_todos}
{move || {
submissions
.get()

View File

@ -13,11 +13,12 @@ use hydration_context::SerializedDataId;
use reactive_graph::{
computed::{
ArcAsyncDerived, ArcAsyncDerivedFuture, ArcMemo, AsyncDerived,
AsyncDerivedFuture,
AsyncDerivedFuture, AsyncDerivedGuard,
},
graph::{Source, ToAnySource, ToAnySubscriber},
owner::Owner,
prelude::*,
signal::guards::{AsyncPlain, Mapped, ReadGuard},
};
use std::{future::IntoFuture, ops::Deref};
@ -242,7 +243,7 @@ impl<T, Ser> IntoFuture for ArcResource<T, Ser>
where
T: Clone + 'static,
{
type Output = T;
type Output = AsyncDerivedGuard<T>;
type IntoFuture = ArcAsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
@ -431,7 +432,7 @@ impl<T, Ser> IntoFuture for Resource<T, Ser>
where
T: Clone + Send + Sync + 'static,
{
type Output = T;
type Output = AsyncDerivedGuard<T>;
type IntoFuture = AsyncDerivedFuture<T>;
#[track_caller]

View File

@ -12,11 +12,12 @@ use crate::{
SubscriberSet, ToAnySource, ToAnySubscriber,
},
owner::{use_context, Owner},
signal::guards::{Plain, ReadGuard},
signal::guards::{AsyncPlain, Plain, ReadGuard},
traits::{DefinedAt, ReadUntracked},
transition::AsyncTransition,
};
use any_spawner::Executor;
use async_lock::RwLock as AsyncRwLock;
use core::fmt::Debug;
use futures::{channel::oneshot, FutureExt, StreamExt};
use or_poisoned::OrPoisoned;
@ -35,13 +36,59 @@ pub struct ArcAsyncDerived<T> {
#[cfg(debug_assertions)]
pub(crate) defined_at: &'static Location<'static>,
// the current state of this signal
pub(crate) value: Arc<RwLock<Option<T>>>,
pub(crate) value: Arc<AsyncRwLock<Option<T>>>,
// holds wakers generated when you .await this
pub(crate) wakers: Arc<RwLock<Vec<Waker>>>,
pub(crate) inner: Arc<RwLock<ArcAsyncDerivedInner>>,
pub(crate) loading: Arc<AtomicBool>,
}
pub(crate) trait BlockingLock<T> {
fn blocking_read_arc(self: &Arc<Self>)
-> async_lock::RwLockReadGuardArc<T>;
fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T>;
fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T>;
}
impl<T> BlockingLock<T> for AsyncRwLock<T> {
fn blocking_read_arc(
self: &Arc<Self>,
) -> async_lock::RwLockReadGuardArc<T> {
#[cfg(not(target_family = "wasm"))]
{
self.read_arc_blocking()
}
#[cfg(target_family = "wasm")]
{
self.read_arc().now_or_never().unwrap()
}
}
fn blocking_read(&self) -> async_lock::RwLockReadGuard<'_, T> {
#[cfg(not(target_family = "wasm"))]
{
self.read_blocking()
}
#[cfg(target_family = "wasm")]
{
self.read().now_or_never().unwrap()
}
}
fn blocking_write(&self) -> async_lock::RwLockWriteGuard<'_, T> {
#[cfg(not(target_family = "wasm"))]
{
self.write_blocking()
}
#[cfg(target_family = "wasm")]
{
self.write().now_or_never().unwrap()
}
}
}
impl<T> Clone for ArcAsyncDerived<T> {
fn clone(&self) -> Self {
Self {
@ -96,7 +143,7 @@ macro_rules! spawn_derived {
subscribers: SubscriberSet::new(),
dirty: false
}));
let value = Arc::new(RwLock::new($initial));
let value = Arc::new(AsyncRwLock::new($initial));
let wakers = Arc::new(RwLock::new(Vec::new()));
let this = ArcAsyncDerived {
@ -129,7 +176,7 @@ macro_rules! spawn_derived {
let mut guard = this.inner.write().or_poisoned();
guard.dirty = false;
*value.write().or_poisoned() = Some(orig_value);
*value.blocking_write() = Some(orig_value);
this.loading.store(false, Ordering::Relaxed);
true
}
@ -185,7 +232,7 @@ macro_rules! spawn_derived {
// generate and assign new value
let new_value = fut.await;
loading.store(false, Ordering::Relaxed);
*value.write().or_poisoned() = Some(new_value);
*value.write().await = Some(new_value);
inner.write().or_poisoned().dirty = true;
ready_tx.send(());
@ -271,11 +318,11 @@ impl<T: 'static> ArcAsyncDerived<T> {
}
impl<T: Send + Sync + 'static> ReadUntracked for ArcAsyncDerived<T> {
type Value = ReadGuard<Option<T>, Plain<Option<T>>>;
type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
if let Some(suspense_context) = use_context::<SuspenseContext>() {
if self.value.read().or_poisoned().is_none() {
if self.value.blocking_read().is_none() {
let handle = suspense_context.task_id();
let ready = SpecialNonReactiveFuture::new(self.ready());
Executor::spawn(async move {
@ -284,7 +331,7 @@ impl<T: Send + Sync + 'static> ReadUntracked for ArcAsyncDerived<T> {
});
}
}
Plain::try_new(Arc::clone(&self.value)).map(ReadGuard::new)
AsyncPlain::try_new(&self.value).map(ReadGuard::new)
}
}

View File

@ -7,7 +7,7 @@ use crate::{
ToAnySource, ToAnySubscriber,
},
owner::StoredValue,
signal::guards::{Plain, ReadGuard},
signal::guards::{AsyncPlain, Plain, ReadGuard},
traits::{DefinedAt, Dispose, ReadUntracked},
unwrap_signal,
};
@ -142,7 +142,7 @@ impl<T: Send + Sync + 'static> DefinedAt for AsyncDerived<T> {
}
impl<T: Send + Sync + 'static> ReadUntracked for AsyncDerived<T> {
type Value = ReadGuard<Option<T>, Plain<Option<T>>>;
type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;
fn try_read_untracked(&self) -> Option<Self::Value> {
self.inner.get().map(|inner| inner.read_untracked())

View File

@ -2,10 +2,11 @@ use super::{suspense::SuspenseContext, ArcAsyncDerived, AsyncDerived};
use crate::{
graph::{AnySource, ToAnySource},
owner::use_context,
signal::guards::Plain,
signal::guards::{AsyncPlain, Mapped, Plain, ReadGuard},
traits::{DefinedAt, Track},
unwrap_signal,
};
use futures::pin_mut;
use or_poisoned::OrPoisoned;
use pin_project_lite::pin_project;
use std::{
@ -18,6 +19,12 @@ use std::{
task::{Context, Poll, Waker},
};
/// A read guard that holds access to an async derived resource.
///
/// Implements [`Deref`](std::ops::Deref) to access the inner value. This should not be held longer
/// than it is needed, as it prevents updates to the inner value.
pub type AsyncDerivedGuard<T> = ReadGuard<T, Mapped<AsyncPlain<Option<T>>, T>>;
/// A [`Future`] that is ready when an [`ArcAsyncDerived`] is finished loading or reloading,
/// but does not contain its value.
pub struct ArcAsyncDerivedReadyFuture {
@ -45,7 +52,7 @@ impl Future for ArcAsyncDerivedReadyFuture {
/// and contains its value.
pub struct ArcAsyncDerivedFuture<T> {
source: AnySource,
value: Arc<RwLock<Option<T>>>,
value: Arc<async_lock::RwLock<Option<T>>>,
loading: Arc<AtomicBool>,
wakers: Arc<RwLock<Vec<Waker>>>,
}
@ -54,7 +61,7 @@ impl<T> IntoFuture for ArcAsyncDerived<T>
where
T: Clone + 'static,
{
type Output = T;
type Output = AsyncDerivedGuard<T>;
type IntoFuture = ArcAsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
@ -73,16 +80,24 @@ impl<T> Future for ArcAsyncDerivedFuture<T>
where
T: Clone + 'static,
{
type Output = T;
type Output = AsyncDerivedGuard<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let waker = cx.waker();
self.source.track();
if self.loading.load(Ordering::Relaxed) {
self.wakers.write().or_poisoned().push(waker.clone());
Poll::Pending
} else {
Poll::Ready(self.value.read().or_poisoned().clone().unwrap())
let value = self.value.read_arc();
pin_mut!(value);
match (self.loading.load(Ordering::Relaxed), value.poll(cx)) {
(true, _) => {
self.wakers.write().or_poisoned().push(waker.clone());
Poll::Pending
}
(_, Poll::Pending) => Poll::Pending,
(_, Poll::Ready(guard)) => Poll::Ready(ReadGuard::new(
Mapped::new_with_guard(AsyncPlain { guard }, |guard| {
guard.as_ref().unwrap()
}),
)),
}
}
}
@ -101,7 +116,7 @@ impl<T> IntoFuture for AsyncDerived<T>
where
T: Send + Sync + Clone + 'static,
{
type Output = T;
type Output = AsyncDerivedGuard<T>;
type IntoFuture = AsyncDerivedFuture<T>;
fn into_future(self) -> Self::IntoFuture {
@ -112,13 +127,11 @@ where
}
}
// this is implemented to output T by cloning it because read guards should not be held across
// .await points, and it's way too easy to trip up by doing that!
impl<T> Future for AsyncDerivedFuture<T>
where
T: Send + Sync + Clone + 'static,
{
type Output = T;
type Output = AsyncDerivedGuard<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

View File

@ -1,4 +1,4 @@
use crate::traits::Trigger;
use crate::{computed::BlockingLock, traits::Trigger};
use core::fmt::Debug;
use guardian::ArcRwLockReadGuardian;
use std::{
@ -120,6 +120,50 @@ impl<T: Display> Display for Plain<T> {
}
}
pub struct AsyncPlain<T: 'static> {
pub(crate) guard: async_lock::RwLockReadGuardArc<T>,
}
impl<T: 'static> Debug for AsyncPlain<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncPlain").finish()
}
}
impl<T: 'static> AsyncPlain<T> {
pub(crate) fn try_new(inner: &Arc<async_lock::RwLock<T>>) -> Option<Self> {
Some(Self {
guard: inner.blocking_read_arc(),
})
}
}
impl<T> Deref for AsyncPlain<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl<T: PartialEq> PartialEq for AsyncPlain<T> {
fn eq(&self, other: &Self) -> bool {
**self == **other
}
}
impl<T: PartialEq> PartialEq<T> for AsyncPlain<T> {
fn eq(&self, other: &T) -> bool {
**self == *other
}
}
impl<T: Display> Display for AsyncPlain<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&**self, f)
}
}
#[derive(Debug)]
pub struct Mapped<Inner, U>
where
@ -139,6 +183,18 @@ impl<T: 'static, U> Mapped<Plain<T>, U> {
}
}
impl<Inner, U> Mapped<Inner, U>
where
Inner: Deref,
{
pub(crate) fn new_with_guard(
inner: Inner,
map_fn: fn(&Inner::Target) -> &U,
) -> Self {
Self { inner, map_fn }
}
}
impl<Inner, U> Deref for Mapped<Inner, U>
where
Inner: Deref,