From 49bd2c5c5d23385d6d0ee82f7b6bbac4b40c0ce9 Mon Sep 17 00:00:00 2001 From: Samuel Guerra Date: Tue, 20 Jun 2023 18:13:07 -0300 Subject: [PATCH] More var and config sync work. --- TODO/_current.md | 20 ++++- tests/config.rs | 22 ++--- zero-ui-core/src/fs_watcher.rs | 129 +++++++++++++---------------- zero-ui-core/src/var.rs | 36 +++++++- zero-ui-core/src/var/tests.rs | 8 +- zero-ui-core/src/var/vars.rs | 5 +- zero-ui-core/src/window/service.rs | 11 --- 7 files changed, 126 insertions(+), 105 deletions(-) diff --git a/TODO/_current.md b/TODO/_current.md index 90b358b8a..c42511065 100644 --- a/TODO/_current.md +++ b/TODO/_current.md @@ -44,6 +44,14 @@ * Refactor into an extension trait. - Is more discoverable as an extension trait, maybe suggested by tooling (rustc, ra)? +```rust +/// Extensions methods for [`WINDOW`] contexts of windows open by [`WINDOWS`]. +#[allow(non_camel_case_types)] +pub trait WINDOW_Ext { + +} +impl WINDOW_Ext for WINDOW { } +``` # View-Process @@ -66,4 +74,14 @@ * Image paste some pixel columns swapped (wrap around start). - Some corrupted pixels, probably same reason. -* Screenshot paste does not have scale-factor. \ No newline at end of file +* Screenshot paste does not have scale-factor. + +# Config + +* Status race condition: + - Test `fallback_reset_entry` in a loop. + - Initial write starts immediately. + - At the "same time" the config updates to an actual value (status set to write). + - Initial write finishes, sets status to idle. + - Waiter is released. + - Actual value write starts. \ No newline at end of file diff --git a/tests/config.rs b/tests/config.rs index 3f7b98b8a..39a87c190 100644 --- a/tests/config.rs +++ b/tests/config.rs @@ -221,7 +221,7 @@ fn concurrent_read_write() { rmv_file_assert(&file); let mut app = App::default().run_headless(false); CONFIG.load(JsonConfig::sync(&file)); - CONFIG.get("key", || Txt::from_static("default")).set("custom").unwrap(); + CONFIG.get("key", || Txt::from_static("default/custom")).set("custom").unwrap(); app.run_task(async { task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap(); @@ -243,7 +243,7 @@ fn concurrent_read_write() { task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap(); }); - let var = CONFIG.get("key", || Txt::from_static("default")); + let var = CONFIG.get("key", || Txt::from_static("default/get")); for _ in 0..8 { assert_eq!("custom", var.get()); var.set("custom").unwrap(); @@ -272,7 +272,7 @@ fn fallback_swap() { let mut app = App::default().run_headless(false); CONFIG.load(JsonConfig::sync(&fallback_cfg)); - CONFIG.get("key", || Txt::from_static("default")).set("fallback").unwrap(); + CONFIG.get("key", || Txt::from_static("default/fallback")).set("fallback").unwrap(); app.update(false).assert_wait(); app.run_task(async { @@ -284,7 +284,7 @@ fn fallback_swap() { } CONFIG.load(JsonConfig::sync(&main_prepared_cfg)); - CONFIG.get("key", || Txt::from_static("default")).set("main").unwrap(); + CONFIG.get("key", || Txt::from_static("default/main")).set("main").unwrap(); app.run_task(async { task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap(); @@ -309,7 +309,7 @@ fn fallback_swap() { app.update(false).assert_wait(); - let key = CONFIG.get("key", || Txt::from_static("final-default")); + let key = CONFIG.get("key", || Txt::from_static("default/get")); assert_eq!("fallback", key.get()); std::fs::rename(main_prepared_cfg, main_cfg).unwrap(); @@ -338,7 +338,7 @@ fn fallback_reset() { let mut app = App::default().run_headless(false); CONFIG.load(JsonConfig::sync(&fallback_cfg)); - CONFIG.get("key", || Txt::from_static("default")).set("fallback").unwrap(); + CONFIG.get("key", || Txt::from_static("default/fallback")).set("fallback").unwrap(); app.update(false).assert_wait(); app.run_task(async { @@ -350,7 +350,7 @@ fn fallback_reset() { } CONFIG.load(JsonConfig::sync(&main_cfg)); - CONFIG.get("key", || Txt::from_static("default")).set("main").unwrap(); + CONFIG.get("key", || Txt::from_static("default/main")).set("main").unwrap(); app.run_task(async { task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap(); @@ -375,7 +375,7 @@ fn fallback_reset() { app.update(false).assert_wait(); - let key = CONFIG.get("key", || Txt::from_static("final-default")); + let key = CONFIG.get("key", || Txt::from_static("default/get")); assert_eq!("main", key.get()); CONFIG.load(MemoryConfig::default()); @@ -406,7 +406,7 @@ fn fallback_reset_entry() { let mut app = App::default().run_headless(false); CONFIG.load(JsonConfig::sync(&fallback_cfg)); - CONFIG.get("key", || Txt::from_static("default")).set("fallback").unwrap(); + CONFIG.get("key", || Txt::from_static("default/fallback")).set("fallback").unwrap(); app.update(false).assert_wait(); app.run_task(async { @@ -418,7 +418,7 @@ fn fallback_reset_entry() { } CONFIG.load(JsonConfig::sync(&main_cfg)); - CONFIG.get("key", || Txt::from_static("default")).set("main").unwrap(); + CONFIG.get("key", || Txt::from_static("default/main")).set("main").unwrap(); app.run_task(async { task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap(); @@ -448,7 +448,7 @@ fn fallback_reset_entry() { .unwrap(); }); - let key = cfg.get("key", || Txt::from_static("default")); + let key = cfg.get("key", || Txt::from_static("default/get")); assert_eq!("main", key.get()); cfg.reset(&ConfigKey::from_static("key")); diff --git a/zero-ui-core/src/fs_watcher.rs b/zero-ui-core/src/fs_watcher.rs index 8a923fe1e..c03f079df 100644 --- a/zero-ui-core/src/fs_watcher.rs +++ b/zero-ui-core/src/fs_watcher.rs @@ -765,12 +765,12 @@ impl FsChangeNote for T { #[must_use = "the note is removed when the handle is dropped"] pub struct FsChangeNoteHandle(Arc>); -/// Annotation for file watcher events. +/// Annotation for file watcher events and var update tags. /// /// Identifies the [`WATCHER.sync`] file that is currently being written to. /// /// [`WATCHER.sync`]: WATCHER::sync -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct WatcherSyncWriteNote(PathBuf); impl WatcherSyncWriteNote { /// Deref. @@ -1026,6 +1026,7 @@ impl WatcherService { file, init, open, + // read clmv!(status, |d| { status.set(S::reading()); match read(d) { @@ -1041,6 +1042,7 @@ impl WatcherService { } } }), + // on_modify clmv!(status, || { status.set(S::idle()); }), @@ -1094,6 +1096,7 @@ impl WatcherService { dir, init, if recursive { open_recursive } else { open }, + // read clmv!(status, |d| { status.set(S::reading()); match read(d) { @@ -1109,6 +1112,7 @@ impl WatcherService { } } }), + // on_modify clmv!(status, || { status.set(S::idle()); }), @@ -1127,7 +1131,7 @@ impl WatcherService { ) -> ArcVar { let handle = self.watch(file.clone()); - let (sync, var) = SyncWithVar::new(handle, file, init, read, write, || {}); + let (sync, var) = SyncWithVar::new(handle, file, init, read, write, |_| {}); self.sync_with_var.push(sync); var } @@ -1146,20 +1150,16 @@ impl WatcherService { let handle = self.watch(file.clone()); let status = var(S::reading()); - let next_var_update_status = Arc::new(Atomic::new(S::writing as fn() -> S)); - let (sync, var) = SyncWithVar::new( handle, file, init, - clmv!(status, next_var_update_status, |f| { + // read + clmv!(status, |f| { status.set(S::reading()); match read(f) { Ok(r) => { - if r.is_some() { - // status handled by `on_modify`. - next_var_update_status.store(S::idle, atomic::Ordering::Relaxed); - } else { + if r.is_none() { status.set(S::idle()); } r @@ -1170,8 +1170,9 @@ impl WatcherService { } } }), + // write clmv!(status, |o, f| { - status.set(S::writing()); + status.set(S::writing()); // init write match write(o, f) { Ok(()) => { status.set(S::idle()); @@ -1181,9 +1182,9 @@ impl WatcherService { } } }), - clmv!(status, || { - let status_fn = next_var_update_status.swap(S::writing, atomic::Ordering::Relaxed); - status.set(status_fn()); + // hook&modify + clmv!(status, |is_read| { + status.set(if is_read { S::idle() } else { S::writing() }); }), ); @@ -1356,30 +1357,48 @@ struct SyncWithVar { handle: WatcherHandle, } impl SyncWithVar { - fn new(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W, on_modify: U) -> (Self, ArcVar) + fn new(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W, var_hook_and_modify: U) -> (Self, ArcVar) where O: VarValue, R: FnMut(io::Result) -> Option + Send + 'static, W: FnMut(O, io::Result) + Send + 'static, - U: Fn() + Send + Sync + 'static, + U: Fn(bool) + Send + Sync + 'static, { if let Ok(p) = file.absolutize() { file = p.into_owned(); } let path = Arc::new(WatcherSyncWriteNote(file)); - let var = var(init); + let latest_from_read = Arc::new(AtomicBool::new(false)); - let on_modify = Arc::new(on_modify); + let var_hook_and_modify = Arc::new(var_hook_and_modify); + + let var = var(init); + var.hook(Box::new(clmv!( + path, + latest_from_read, + var_hook_and_modify, + |args: &VarHookArgs| { + let is_read = args.downcast_tags::>().any(|n| n == &path); + latest_from_read.store(is_read, Ordering::Relaxed); + var_hook_and_modify(is_read); + true + } + ))) + .perm(); + + type PendingFlag = u8; + const READ: PendingFlag = 0b01; + const WRITE: PendingFlag = 0b11; struct TaskData { - pending: Atomic, + pending: Atomic, read_write: Mutex<(R, W)>, wk_var: WeakArcVar, last_write: Atomic>, } let task_data = Arc::new(TaskData { - pending: Atomic::new(SyncFlags::empty()), + pending: Atomic::new(0), read_write: Mutex::new((read, write)), wk_var: var.downgrade(), last_write: Atomic::new(None), @@ -1397,20 +1416,21 @@ impl SyncWithVar { let mut debounce = None; + let mut pending = 0; + match ev { SyncEvent::Update(sync_debounce) => { - if var.is_new() && !SyncFlags::pop(&task_data.pending, SyncFlags::SKIP_WRITE) { + if var.is_new() && !latest_from_read.load(Ordering::Relaxed) { debounce = Some(sync_debounce); - SyncFlags::atomic_insert(&task_data.pending, SyncFlags::WRITE); + pending |= WRITE; } else { return; } } SyncEvent::Event(args) => { if args.rescan() { - SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ); + pending |= READ; } else { - let mut read = false; 'ev: for ev in args.changes_for_path(&path) { for note in ev.notes::() { if path.as_path() == note.as_path() { @@ -1419,20 +1439,19 @@ impl SyncWithVar { } } - SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ); - read = true; + pending |= READ; break; } - if !read { + if pending == 0 { return; } } } SyncEvent::Init => { if path.exists() { - SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ); + pending |= READ; } else { - SyncFlags::atomic_insert(&task_data.pending, SyncFlags::WRITE); + pending |= WRITE; } } SyncEvent::FlushShutdown => { @@ -1442,19 +1461,20 @@ impl SyncWithVar { }; drop(var); + task_data.pending.fetch_or(pending, Ordering::Relaxed); + if task_data.read_write.try_lock().is_none() { // another spawn is already applying return; } - task::spawn_wait(clmv!(task_data, path, handle, on_modify, || { + task::spawn_wait(clmv!(task_data, path, var_hook_and_modify, handle, || { let mut read_write = task_data.read_write.lock(); let (read, write) = &mut *read_write; loop { - let w = SyncFlags::pop(&task_data.pending, SyncFlags::WRITE); - let r = SyncFlags::pop(&task_data.pending, SyncFlags::READ); + let pending = task_data.pending.swap(0, Ordering::Relaxed); - if w { + if pending == WRITE { if let Some(d) = debounce { if let Some(t) = task_data.last_write.load(Ordering::Relaxed) { let elapsed = t.elapsed(); @@ -1481,7 +1501,7 @@ impl SyncWithVar { handle.force_drop(); return; } - } else if r { + } else if pending == READ { if task_data.wk_var.strong_count() == 0 { handle.force_drop(); return; @@ -1489,11 +1509,10 @@ impl SyncWithVar { if let Some(update) = read(WatchFile::open(path.as_path())) { if let Some(var) = task_data.wk_var.upgrade() { - SyncFlags::atomic_insert(&task_data.pending, SyncFlags::SKIP_WRITE); - - var.modify(clmv!(on_modify, |vm| { + var.modify(clmv!(path, var_hook_and_modify, |vm| { vm.set(update); - on_modify(); + vm.push_tag(path); + var_hook_and_modify(true); })); } else { handle.force_drop(); @@ -1542,40 +1561,6 @@ enum SyncEvent<'a> { Init, FlushShutdown, } -bitflags! { - #[derive(Clone, Copy)] - struct SyncFlags: u8 { - const READ = 0b0000_0001; - const WRITE = 0b0000_0010; - const SKIP_WRITE = 0b0010_0000; - } -} -impl SyncFlags { - fn atomic_insert(f: &Atomic, flag: Self) { - let _ = f.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut f| { - if f.contains(flag) { - None - } else { - f.insert(flag); - Some(f) - } - }); - } - - fn pop(f: &Atomic, flag: Self) -> bool { - let mut contains = false; - let _ = f.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut f| { - if f.contains(flag) { - contains = true; - f.remove(flag); - Some(f) - } else { - None - } - }); - contains - } -} struct Watchers { dirs: HashMap, diff --git a/zero-ui-core/src/var.rs b/zero-ui-core/src/var.rs index bb9f23bbf..9ab38a4d6 100644 --- a/zero-ui-core/src/var.rs +++ b/zero-ui-core/src/var.rs @@ -1076,6 +1076,25 @@ impl Clone for OnVarArgs { } } +/// Args for [`Var::trace_value`]. +pub struct TraceValueArgs<'a, T: VarValue> { + args: &'a VarHookArgs<'a>, + _type: PhantomData<&'a T>, +} +impl<'a, T: VarValue> ops::Deref for TraceValueArgs<'a, T> { + type Target = VarHookArgs<'a>; + + fn deref(&self) -> &Self::Target { + self.args + } +} +impl<'a, T: VarValue> TraceValueArgs<'a, T> { + /// Strongly-typed reference to the new value. + pub fn value(&self) -> &'a T { + self.args.downcast_value::().unwrap() + } +} + /// Represents an observable value. /// /// All variable types can be read, some can update, variables update only in between app updates so @@ -1863,13 +1882,22 @@ pub trait Var: IntoVar + AnyVar + Clone { /// [`actual_var`]: Var::actual_var fn trace_value(&self, mut enter_value: E) -> VarHandle where - E: FnMut(&OnVarArgs) -> S + Send + 'static, + E: FnMut(&TraceValueArgs) -> S + Send + 'static, S: Send + 'static, { - let mut span = Some(enter_value(&OnVarArgs::new(self.get(), vec![]))); - self.on_pre_new(app_hn!(|args, _| { + let span = self.with(|v| { + enter_value(&TraceValueArgs { + args: &VarHookArgs::new(v, false, &[]), + _type: PhantomData, + }) + }); + let data = Mutex::new((Some(span), enter_value)); + self.hook(Box::new(move |args| { + let mut data = data.lock(); + let (span, enter_value) = &mut *data; let _ = span.take(); - span = Some(enter_value(args)); + *span = Some(enter_value(&TraceValueArgs { args, _type: PhantomData })); + true })) } diff --git a/zero-ui-core/src/var/tests.rs b/zero-ui-core/src/var/tests.rs index 2a49972af..ad27fb789 100644 --- a/zero-ui-core/src/var/tests.rs +++ b/zero-ui-core/src/var/tests.rs @@ -1021,8 +1021,8 @@ mod cow { let base_values = Arc::new(Mutex::new(vec![])); let cow_values = Arc::new(Mutex::new(vec![])); - base.trace_value(clmv!(base_values, |v| base_values.lock().push(v.value))).perm(); - cow.trace_value(clmv!(cow_values, |v| cow_values.lock().push(v.value))).perm(); + base.trace_value(clmv!(base_values, |v| base_values.lock().push(*v.value()))).perm(); + cow.trace_value(clmv!(cow_values, |v| cow_values.lock().push(*v.value()))).perm(); base.set(1); app.update(false).assert_wait(); @@ -1069,8 +1069,8 @@ mod multi { let a_values = Arc::new(Mutex::new(vec![])); let b_values = Arc::new(Mutex::new(vec![])); - a.trace_value(clmv!(a_values, |v| a_values.lock().push(v.value))).perm(); - b.trace_value(clmv!(b_values, |v| b_values.lock().push(v.value))).perm(); + a.trace_value(clmv!(a_values, |v| a_values.lock().push(*v.value()))).perm(); + b.trace_value(clmv!(b_values, |v| b_values.lock().push(*v.value()))).perm(); assert!(!a.get()); assert_eq!(b.get(), 0); diff --git a/zero-ui-core/src/var/vars.rs b/zero-ui-core/src/var/vars.rs index 45ae5dd38..b6c8c2c17 100644 --- a/zero-ui-core/src/var/vars.rs +++ b/zero-ui-core/src/var/vars.rs @@ -273,17 +273,18 @@ impl VARS { drop(vars); update_each_and_bindings(updates, 0); + vars = VARS_SV.write(); vars.updating_thread = None; if !vars.updates_after.get_mut().is_empty() { - drop(vars); - let depth = depth + 1; if depth == 10 { // high-pressure from worker threads, skip return; } + + drop(vars); Self::apply_updates_and_after(depth) } } diff --git a/zero-ui-core/src/window/service.rs b/zero-ui-core/src/window/service.rs index fd8b94712..ec4d2b2b5 100644 --- a/zero-ui-core/src/window/service.rs +++ b/zero-ui-core/src/window/service.rs @@ -1377,17 +1377,6 @@ impl fmt::Debug for WindowLoadingHandle { } } -/* - -/// Extensions methods for [`WINDOW`] contexts of windows open by [`WINDOWS`]. -#[allow(non_camel_case_types)] -pub trait WINDOW_Ext { - -} -impl WINDOW_Ext for WINDOW { } - - */ - /// Control and variables of the context [`WINDOW`] created using [`WINDOWS`]. #[allow(non_camel_case_types)] pub struct WINDOW_CTRL;