From a6534a81206153cfc4590a22de9bdeddb9be2e37 Mon Sep 17 00:00:00 2001 From: Samuel Guerra Date: Sat, 3 Jun 2023 18:43:54 -0300 Subject: [PATCH] More WATCHER.sync bug fixes. --- tests/config.rs | 2 +- zero-ui-core/src/fs_watcher.rs | 177 +++++++++++++++++++++++++++------ 2 files changed, 150 insertions(+), 29 deletions(-) diff --git a/tests/config.rs b/tests/config.rs index 0683c1293..654cd2c01 100644 --- a/tests/config.rs +++ b/tests/config.rs @@ -314,7 +314,7 @@ fn fallback_swap() { std::fs::rename(main_prepared_cfg, main_cfg).unwrap(); app.update(false).assert_wait(); app.run_task(async { - task::deadline(500.ms()).await; // wait for system rename event (+ debounce) + task::deadline(1.5.secs()).await; // wait for system rename event (+ debounce) task::with_deadline(CONFIG.wait_idle(), 5.secs()).await.unwrap(); }); let status = CONFIG.status().get(); diff --git a/zero-ui-core/src/fs_watcher.rs b/zero-ui-core/src/fs_watcher.rs index f344af4e6..896260fd5 100644 --- a/zero-ui-core/src/fs_watcher.rs +++ b/zero-ui-core/src/fs_watcher.rs @@ -6,7 +6,7 @@ use std::{ mem, ops, path::{Path, PathBuf}, sync::{atomic::AtomicBool, Arc}, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant}, }; use atomic::{Atomic, Ordering}; @@ -407,6 +407,16 @@ impl WATCHER { args.events_for_path(&dir).next().is_some() })) } + + /// Push a `note` that will be cloned on all subsequent change events until it the returned handle is dropped. + /// + /// This can be used to tag all events that happened over a period of time, something you can't do just + /// by receiving the events due to async delays caused by debounce. + /// + /// Note that the underlying system events the [`notify`] crate uses are not guaranteed to be synchronous. + pub fn annotate(&self, note: Arc) -> FsChangeNoteHandle { + WATCHER_SV.write().annotate(note) + } } /// Represents a status type for [`WATCHER.sync_status`]. @@ -794,11 +804,86 @@ impl std::error::Error for WatchFileParseError &dyn std::any::Any; +} +impl FsChangeNote for T { + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +/// Handle that holds a [`WATCHER.annotate`] note. +/// +/// [`WATCHER.annotate`]: WATCHER::annotate +#[derive(Clone)] +#[must_use = "the note is removed when the handle is dropped"] +pub struct FsChangeNoteHandle(Arc>); + +/// Annotation for file watcher events. +/// +/// Identifies the [`WATCHER.sync`] file that is currently being written to. +/// +/// [`WATCHER.sync`]: WATCHER::sync +#[derive(Debug)] +pub struct WatcherSyncWriteNote(PathBuf); +impl WatcherSyncWriteNote { + /// Deref. + pub fn as_path(&self) -> &Path { + self + } +} +impl ops::Deref for WatcherSyncWriteNote { + type Target = Path; + + fn deref(&self) -> &Self::Target { + self.0.as_path() + } +} + +/// Represents a single file system change, annotated. +#[derive(Debug)] +pub struct FsChange { + /// All [`WATCHER.annotate`] that where set when this event happened. + /// + /// [`WATCHER.annotate`]: WATCHER::annotate + pub notes: Vec>, + + /// The actual notify event or error. + pub event: notify::Result, +} +impl FsChange { + /// If the change affects the `path`. + pub fn is_for_path(&self, path: &Path) -> bool { + if let Ok(ev) = &self.event { + return ev.paths.iter().any(|p| p.starts_with(path)); + } + false + } + + /// If the change affects any path matched by the glob pattern. + pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool { + if let Ok(ev) = &self.event { + return ev.paths.iter().any(|p| pattern.matches_path(p)); + } + false + } + + /// Iterate over all notes of the type `T`. + pub fn notes(&self) -> impl Iterator { + self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::()) + } +} + event_args! { /// [`FS_CHANGES_EVENT`] arguments. pub struct FsChangesArgs { /// All notify changes since the last event. - pub changes: Arc>>, + pub changes: Arc>, .. @@ -811,12 +896,12 @@ event_args! { impl FsChangesArgs { /// Iterate over all change events. pub fn events(&self) -> impl Iterator + '_ { - self.changes.iter().filter_map(|r| r.as_ref().ok()) + self.changes.iter().filter_map(|r| r.event.as_ref().ok()) } /// Iterate over all file watcher errors. pub fn errors(&self) -> impl Iterator + '_ { - self.changes.iter().filter_map(|r| r.as_ref().err()) + self.changes.iter().filter_map(|r| r.event.as_ref().err()) } /// Returns `true` is some events where lost. @@ -831,7 +916,18 @@ impl FsChangesArgs { self.events().any(|e| e.need_rescan()) } - /// Iterate over all change events that affects paths selected by the `glob` pattern. + /// Iterate over all changes that affects paths selected by the `glob` pattern. + pub fn changes_for(&self, glob: &str) -> Result + '_, glob::PatternError> { + let glob = glob::Pattern::new(glob)?; + Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob))) + } + + /// Iterate over all changes that affects paths that are equal to `path` or inside it. + pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator + 'a { + self.changes.iter().filter(move |c| c.is_for_path(path)) + } + + /// Iterate over all change events that affects that are equal to `path` or inside it. pub fn events_for(&self, glob: &str) -> Result + '_, glob::PatternError> { let glob = glob::Pattern::new(glob)?; Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p)))) @@ -896,11 +992,13 @@ struct WatcherService { watcher: Watchers, debounce_oldest: Instant, - debounce_buffer: Vec>, + debounce_buffer: Vec, debounce_timer: Option, read_to_var: Vec, sync_with_var: Vec, + + notes: Vec>>, } impl WatcherService { fn new() -> Self { @@ -914,6 +1012,7 @@ impl WatcherService { debounce_timer: None, read_to_var: vec![], sync_with_var: vec![], + notes: vec![], } } @@ -1008,7 +1107,16 @@ impl WatcherService { let notify = self.debounce_oldest.elapsed() >= self.debounce.get(); - self.debounce_buffer.push(r); + let mut notes = Vec::with_capacity(self.notes.len()); + self.notes.retain(|n| match n.upgrade() { + Some(n) => { + notes.push(Arc::clone(&*n)); + true + } + None => false, + }); + + self.debounce_buffer.push(FsChange { notes, event: r }); if notify { self.notify(); @@ -1022,6 +1130,12 @@ impl WatcherService { } } + fn annotate(&mut self, note: Arc) -> FsChangeNoteHandle { + let handle = Arc::new(note); + self.notes.push(Arc::downgrade(&handle)); + FsChangeNoteHandle(handle) + } + fn on_debounce_timer(&mut self) { if !self.debounce_buffer.is_empty() { self.notify(); @@ -1153,7 +1267,7 @@ impl SyncWithVar { file = p.into_owned(); } - let path = Arc::new(file); + let path = Arc::new(WatcherSyncWriteNote(file)); let var = var(init); struct TaskData { @@ -1161,14 +1275,12 @@ impl SyncWithVar { read_write: Mutex<(R, W)>, wk_var: WeakArcVar, last_write: Atomic>, - modified: Atomic>, } let task_data = Arc::new(TaskData { pending: Atomic::new(SyncFlags::empty()), read_write: Mutex::new((read, write)), wk_var: var.downgrade(), last_write: Atomic::new(None), - modified: Atomic::new(None), }); // task drains pending, drops handle if the var is dropped. @@ -1193,10 +1305,25 @@ impl SyncWithVar { } } SyncEvent::Event(args) => { - if args.events_for_path(&path).next().is_some() { + if args.rescan() { SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ); } else { - return; + let mut read = false; + 'ev: for ev in args.changes_for_path(&path) { + for note in ev.notes::() { + if path.as_path() == note.as_path() { + // we caused this event + continue 'ev; + } + } + + SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ); + read = true; + break; + } + if !read { + return; + } } } SyncEvent::Init => { @@ -1243,10 +1370,9 @@ impl SyncWithVar { return; }; - write(value, WriteFile::open(path.to_path_buf())); - - if let Ok(m) = std::fs::metadata(&*path).and_then(|m| m.modified()) { - task_data.modified.store(Some(m), Ordering::Relaxed); + { + let _note = WATCHER.annotate(path.clone()); + write(value, WriteFile::open(path.to_path_buf())); } if task_data.wk_var.strong_count() == 0 { @@ -1259,18 +1385,7 @@ impl SyncWithVar { return; } - let file = WatchFile::open(path.as_path()); - if let Ok(f) = &file { - if let Ok(m) = f.metadata().and_then(|f| f.modified()) { - let last = task_data.modified.swap(Some(m), Ordering::Relaxed); - if last == Some(m) { - // already handled - return; - } - } - } - - if let Some(update) = read(file) { + if let Some(update) = read(WatchFile::open(path.as_path())) { if let Some(var) = task_data.wk_var.upgrade() { SyncFlags::atomic_insert(&task_data.pending, SyncFlags::SKIP_WRITE); var.set(update); @@ -1567,6 +1682,12 @@ impl Watchers { } fn allow(&mut self, r: ¬ify::Event) -> bool { + if let notify::EventKind::Access(_) = r.kind { + if !r.need_rescan() { + return false; + } + } + for (dir, w) in &mut self.dirs { let mut matched = false;