Refactored var update waiter to be more clear.

Improved handling of
This commit is contained in:
Samuel Guerra 2023-06-20 14:32:46 -03:00
parent b0a427f344
commit f289ae54b5
11 changed files with 110 additions and 75 deletions

View File

@ -110,7 +110,7 @@ fn images_render() {
app.run_task(async move {
while img.with(Img::is_loading) {
img.wait_new().await;
img.wait_update().await;
}
let img = img.get();

View File

@ -439,7 +439,7 @@ fn fallback_reset_entry() {
task::with_deadline(
async move {
while !status.get().is_idle() {
status.wait_is_new().await;
status.wait_update().await;
}
},
5.secs(),
@ -462,7 +462,7 @@ fn fallback_reset_entry() {
task::with_deadline(
async move {
while !status.get().is_idle() {
status.wait_is_new().await;
status.wait_update().await;
}
},
5.secs(),

View File

@ -94,7 +94,7 @@ impl CONFIG {
let status = self.status();
while !status.get().is_idle() {
status.wait_is_new().await;
status.wait_update().await;
}
}

View File

@ -223,7 +223,7 @@ impl L10N {
let status = self.available_langs_status();
while matches!(status.get(), LangResourceStatus::Loading) {
status.wait_is_new().await;
status.wait_update().await;
}
}

View File

@ -74,7 +74,7 @@ impl LangResource {
/// Await resource status to not be loading.
pub async fn wait(&self) {
while matches!(self.status.get(), LangResourceStatus::Loading) {
self.status.wait_is_new().await;
self.status.wait_update().await;
}
}
}

View File

@ -78,7 +78,7 @@ pub mod types {
pub use super::cow::{ArcCowVar, WeakCowVar};
pub use super::expr::__expr_var;
pub use super::flat_map::{ArcFlatMapVar, WeakFlatMapVar};
pub use super::future::{WaitIsNewFut, WaitIsNotAnimatingFut, WaitNewFut};
pub use super::future::{WaitIsNotAnimatingFut, WaitUpdateFut};
pub use super::map_ref::{MapRef, MapRefBidi, WeakMapRef, WeakMapRefBidi};
pub use super::merge::{ArcMergeVar, ArcMergeVarInput, ContextualizedArcMergeVar, MergeVarInputs, WeakMergeVar, __merge_var};
pub use super::property_build_action::easing_property;
@ -1148,7 +1148,7 @@ pub trait Var<T: VarValue>: IntoVar<T, Var = Self> + AnyVar + Clone {
/// The returned variable can still update if `self` is modified, but it does not have the `MODIFY` capability.
fn read_only(&self) -> Self::ReadOnly;
/// Create a future that awaits for the [`VarUpdateId`] to change.
/// Create a future that awaits for the [`last_update`] to change.
///
/// The future can be reused. Note that [`is_new`] will be `true` when the future elapses only in [`UiTask`] updated
/// by the UI tree, but the future will elapse in any thread when the variable updates after the future is instantiated.
@ -1157,10 +1157,11 @@ pub trait Var<T: VarValue>: IntoVar<T, Var = Self> + AnyVar + Clone {
/// a sequence of `get(); wait_is_new().await; get();` can miss a value between `get` and `wait_is_new`.
///
/// [`get`]: Var::get
/// [`last_update`]: Var::last_update
/// [`is_new`]: AnyVar::is_new
/// [`UiTask`]: crate::task::ui::UiTask
fn wait_is_new(&self) -> types::WaitIsNewFut<Self> {
types::WaitIsNewFut::new(self)
fn wait_update(&self) -> types::WaitUpdateFut<Self> {
types::WaitUpdateFut::new(self)
}
/// Create a future that awaits for [`is_animating`] to change from `true` to `false`.
@ -1257,21 +1258,6 @@ pub trait Var<T: VarValue>: IntoVar<T, Var = Self> + AnyVar + Clone {
self.is_new() && self.get_ne(value)
}
/// Create a future that awaits until the [`VarUpdateId`] changes and yields [`get`].
///
/// The future can be reused. Note that [`is_new`] will be `true` when the future elapses only in [`UiTask`] updated
/// by the UI tree, but the future will elapse in any thread when the variable updates after the future is instantiated.
///
/// Note that outside of the UI tree there is no variable synchronization across multiple var method calls, so
/// a sequence of `get(); wait_new().await; get();` can miss a value between `get` and `wait_new`.
///
/// [`get`]: Var::get
/// [`is_new`]: AnyVar::is_new
/// [`UiTask`]: crate::task::ui::UiTask
fn wait_new(&self) -> types::WaitNewFut<T, Self> {
types::WaitNewFut::new(self)
}
/// Schedule a new `value` for the variable, it will be set in the end of the current app update.
fn set<I>(&self, value: I) -> Result<(), VarIsReadOnlyError>
where

View File

@ -1,37 +1,13 @@
use super::*;
use std::{future::*, marker::PhantomData, pin::Pin, task::Poll};
use std::{future::*, pin::Pin, task::Poll};
/// See [`Var::wait_new`].
pub struct WaitNewFut<'a, T: VarValue, V: Var<T>> {
is_new: WaitIsNewFut<'a, V>,
_value: PhantomData<&'a T>,
}
impl<'a, T: VarValue, V: Var<T>> WaitNewFut<'a, T, V> {
pub(super) fn new(var: &'a V) -> Self {
Self {
is_new: WaitIsNewFut::new(var),
_value: PhantomData,
}
}
}
impl<'a, T: VarValue, V: Var<T>> Future for WaitNewFut<'a, T, V> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<T> {
match self.is_new.poll_impl(cx) {
Poll::Ready(()) => Poll::Ready(self.is_new.var.get()),
Poll::Pending => Poll::Pending,
}
}
}
/// See [`Var::wait_is_new`].
pub struct WaitIsNewFut<'a, V: AnyVar> {
/// See [`Var::wait_update`].
pub struct WaitUpdateFut<'a, V: AnyVar> {
var: &'a V,
update_id: VarUpdateId,
}
impl<'a, V: AnyVar> WaitIsNewFut<'a, V> {
impl<'a, V: AnyVar> WaitUpdateFut<'a, V> {
pub(super) fn new(var: &'a V) -> Self {
Self {
update_id: var.last_update(),
@ -39,12 +15,12 @@ impl<'a, V: AnyVar> WaitIsNewFut<'a, V> {
}
}
fn poll_impl(&mut self, cx: &mut std::task::Context<'_>) -> Poll<()> {
fn poll_impl(&mut self, cx: &mut std::task::Context<'_>) -> Poll<VarUpdateId> {
let update_id = self.var.last_update();
if update_id != self.update_id {
// has changed since init or last poll
self.update_id = update_id;
Poll::Ready(())
Poll::Ready(update_id)
} else {
// has not changed since init or last poll, register hook
let waker = cx.waker().clone();
@ -59,7 +35,7 @@ impl<'a, V: AnyVar> WaitIsNewFut<'a, V> {
// changed in parallel
// the hook will be dropped (handle not perm), it may wake in parallel too, but poll checks again.
self.update_id = update_id;
Poll::Ready(())
Poll::Ready(update_id)
} else {
// really not ready yet
handle.perm();
@ -68,8 +44,8 @@ impl<'a, V: AnyVar> WaitIsNewFut<'a, V> {
}
}
}
impl<'a, V: AnyVar> Future for WaitIsNewFut<'a, V> {
type Output = ();
impl<'a, V: AnyVar> Future for WaitUpdateFut<'a, V> {
type Output = VarUpdateId;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.poll_impl(cx)

View File

@ -129,7 +129,7 @@ impl<T: VarValue> ResponseVar<T> {
/// [`rsp`]: Self::rsp
pub async fn wait_done(&self) {
while !self.is_done() {
let w = self.wait_is_new();
let w = self.wait_update();
if self.is_done() {
break;
}

View File

@ -1084,3 +1084,58 @@ mod multi {
assert_eq!(&b_values.lock()[..], &[0, 1]);
}
}
mod threads {
use crate::{app::App, task};
use super::*;
#[test]
fn set_from_other_thread_once() {
let mut app = App::minimal().run_headless(false);
let test = var(1);
task::spawn(async_clmv!(test, {
test.set(2);
}));
let test = async move {
while test.get() != 2 {
test.wait_update().await;
}
};
app.run_task(task::with_deadline(test, 1.secs())).unwrap().unwrap();
}
#[test]
fn set_from_other_thread_many() {
let mut app = App::minimal().run_headless(false);
let test = var(1);
task::spawn(async_clmv!(test, {
for i in 2..=1000 {
test.set(i);
if i % 10 == 0 {
task::deadline(2.ms()).await;
}
}
}));
let mut prev = 0;
let test = async move {
loop {
let new = test.get();
assert!(prev < new, "{prev} < {new}");
if new == 1000 {
break;
}
prev = new;
test.wait_update().await;
}
};
app.run_task(task::with_deadline(test, 5.secs())).unwrap().unwrap();
}
}

View File

@ -248,6 +248,9 @@ impl VARS {
}
pub(crate) fn apply_updates(&self) {
Self::apply_updates_and_after(0)
}
fn apply_updates_and_after(depth: u8) {
let mut vars = VARS_SV.write();
vars.update_id.next();
@ -255,30 +258,34 @@ impl VARS {
drop(vars);
let mut vars = VARS_SV.write();
let updates = mem::take(vars.updates.get_mut());
// updates requested by other threads while was applying updates
let mut updates = mem::take(vars.updates_after.get_mut());
// normal updates
if updates.is_empty() {
updates = mem::take(vars.updates.get_mut());
} else {
updates.append(vars.updates.get_mut());
}
// apply pending updates
if !updates.is_empty() {
debug_assert!(vars.updating_thread.is_none());
vars.updating_thread = Some(std::thread::current().id());
drop(vars);
update_each_and_bindings(updates, 0);
}
// updated requested by other threads while was applying normal updates
let mut vars = VARS_SV.write();
let updates_after = mem::take(vars.updates_after.get_mut());
if !updates_after.is_empty() {
if vars.updating_thread.is_none() {
vars.updating_thread = Some(std::thread::current().id());
}
drop(vars);
update_each_and_bindings(updates_after, 0);
VARS_SV.write().updating_thread = None;
} else {
vars = VARS_SV.write();
vars.updating_thread = None;
if !vars.updates_after.get_mut().is_empty() {
drop(vars);
let depth = depth + 1;
if depth == 10 {
// high-pressure from worker threads, skip
return;
}
Self::apply_updates_and_after(depth)
}
}
fn update_each_and_bindings(updates: Vec<(ModifyInfo, VarUpdateFn)>, depth: u16) {

View File

@ -1377,6 +1377,17 @@ impl fmt::Debug for WindowLoadingHandle {
}
}
/*
/// Extensions methods for [`WINDOW`] contexts of windows open by [`WINDOWS`].
#[allow(non_camel_case_types)]
pub trait WINDOW_Ext {
}
impl WINDOW_Ext for WINDOW { }
*/
/// Control and variables of the context [`WINDOW`] created using [`WINDOWS`].
#[allow(non_camel_case_types)]
pub struct WINDOW_CTRL;