More WATCHER.sync bug fixes.

This commit is contained in:
Samuel Guerra 2023-06-03 18:43:54 -03:00
parent b633a058af
commit a6534a8120
2 changed files with 150 additions and 29 deletions

View File

@ -314,7 +314,7 @@ fn fallback_swap() {
std::fs::rename(main_prepared_cfg, main_cfg).unwrap();
app.update(false).assert_wait();
app.run_task(async {
task::deadline(500.ms()).await; // wait for system rename event (+ debounce)
task::deadline(1.5.secs()).await; // wait for system rename event (+ debounce)
task::with_deadline(CONFIG.wait_idle(), 5.secs()).await.unwrap();
});
let status = CONFIG.status().get();

View File

@ -6,7 +6,7 @@ use std::{
mem, ops,
path::{Path, PathBuf},
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};
use atomic::{Atomic, Ordering};
@ -407,6 +407,16 @@ impl WATCHER {
args.events_for_path(&dir).next().is_some()
}))
}
/// Push a `note` that will be cloned on all subsequent change events until it the returned handle is dropped.
///
/// This can be used to tag all events that happened over a period of time, something you can't do just
/// by receiving the events due to async delays caused by debounce.
///
/// Note that the underlying system events the [`notify`] crate uses are not guaranteed to be synchronous.
pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
WATCHER_SV.write().annotate(note)
}
}
/// Represents a status type for [`WATCHER.sync_status`].
@ -794,11 +804,86 @@ impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E
}
}
/// Represents a [`FsChange`] note.
///
/// This trait is already implemented for types it applies.
pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
/// Access any.
fn as_any(&self) -> &dyn std::any::Any;
}
impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Handle that holds a [`WATCHER.annotate`] note.
///
/// [`WATCHER.annotate`]: WATCHER::annotate
#[derive(Clone)]
#[must_use = "the note is removed when the handle is dropped"]
pub struct FsChangeNoteHandle(Arc<Arc<dyn FsChangeNote>>);
/// Annotation for file watcher events.
///
/// Identifies the [`WATCHER.sync`] file that is currently being written to.
///
/// [`WATCHER.sync`]: WATCHER::sync
#[derive(Debug)]
pub struct WatcherSyncWriteNote(PathBuf);
impl WatcherSyncWriteNote {
/// Deref.
pub fn as_path(&self) -> &Path {
self
}
}
impl ops::Deref for WatcherSyncWriteNote {
type Target = Path;
fn deref(&self) -> &Self::Target {
self.0.as_path()
}
}
/// Represents a single file system change, annotated.
#[derive(Debug)]
pub struct FsChange {
/// All [`WATCHER.annotate`] that where set when this event happened.
///
/// [`WATCHER.annotate`]: WATCHER::annotate
pub notes: Vec<Arc<dyn FsChangeNote>>,
/// The actual notify event or error.
pub event: notify::Result<notify::Event>,
}
impl FsChange {
/// If the change affects the `path`.
pub fn is_for_path(&self, path: &Path) -> bool {
if let Ok(ev) = &self.event {
return ev.paths.iter().any(|p| p.starts_with(path));
}
false
}
/// If the change affects any path matched by the glob pattern.
pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
if let Ok(ev) = &self.event {
return ev.paths.iter().any(|p| pattern.matches_path(p));
}
false
}
/// Iterate over all notes of the type `T`.
pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
}
}
event_args! {
/// [`FS_CHANGES_EVENT`] arguments.
pub struct FsChangesArgs {
/// All notify changes since the last event.
pub changes: Arc<Vec<notify::Result<notify::Event>>>,
pub changes: Arc<Vec<FsChange>>,
..
@ -811,12 +896,12 @@ event_args! {
impl FsChangesArgs {
/// Iterate over all change events.
pub fn events(&self) -> impl Iterator<Item = &notify::Event> + '_ {
self.changes.iter().filter_map(|r| r.as_ref().ok())
self.changes.iter().filter_map(|r| r.event.as_ref().ok())
}
/// Iterate over all file watcher errors.
pub fn errors(&self) -> impl Iterator<Item = &notify::Error> + '_ {
self.changes.iter().filter_map(|r| r.as_ref().err())
self.changes.iter().filter_map(|r| r.event.as_ref().err())
}
/// Returns `true` is some events where lost.
@ -831,7 +916,18 @@ impl FsChangesArgs {
self.events().any(|e| e.need_rescan())
}
/// Iterate over all change events that affects paths selected by the `glob` pattern.
/// Iterate over all changes that affects paths selected by the `glob` pattern.
pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
let glob = glob::Pattern::new(glob)?;
Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
}
/// Iterate over all changes that affects paths that are equal to `path` or inside it.
pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &FsChange> + 'a {
self.changes.iter().filter(move |c| c.is_for_path(path))
}
/// Iterate over all change events that affects that are equal to `path` or inside it.
pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &notify::Event> + '_, glob::PatternError> {
let glob = glob::Pattern::new(glob)?;
Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
@ -896,11 +992,13 @@ struct WatcherService {
watcher: Watchers,
debounce_oldest: Instant,
debounce_buffer: Vec<notify::Result<notify::Event>>,
debounce_buffer: Vec<FsChange>,
debounce_timer: Option<DeadlineHandle>,
read_to_var: Vec<ReadToVar>,
sync_with_var: Vec<SyncWithVar>,
notes: Vec<std::sync::Weak<Arc<dyn FsChangeNote>>>,
}
impl WatcherService {
fn new() -> Self {
@ -914,6 +1012,7 @@ impl WatcherService {
debounce_timer: None,
read_to_var: vec![],
sync_with_var: vec![],
notes: vec![],
}
}
@ -1008,7 +1107,16 @@ impl WatcherService {
let notify = self.debounce_oldest.elapsed() >= self.debounce.get();
self.debounce_buffer.push(r);
let mut notes = Vec::with_capacity(self.notes.len());
self.notes.retain(|n| match n.upgrade() {
Some(n) => {
notes.push(Arc::clone(&*n));
true
}
None => false,
});
self.debounce_buffer.push(FsChange { notes, event: r });
if notify {
self.notify();
@ -1022,6 +1130,12 @@ impl WatcherService {
}
}
fn annotate(&mut self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
let handle = Arc::new(note);
self.notes.push(Arc::downgrade(&handle));
FsChangeNoteHandle(handle)
}
fn on_debounce_timer(&mut self) {
if !self.debounce_buffer.is_empty() {
self.notify();
@ -1153,7 +1267,7 @@ impl SyncWithVar {
file = p.into_owned();
}
let path = Arc::new(file);
let path = Arc::new(WatcherSyncWriteNote(file));
let var = var(init);
struct TaskData<R, W, O: VarValue> {
@ -1161,14 +1275,12 @@ impl SyncWithVar {
read_write: Mutex<(R, W)>,
wk_var: WeakArcVar<O>,
last_write: Atomic<Option<Instant>>,
modified: Atomic<Option<SystemTime>>,
}
let task_data = Arc::new(TaskData {
pending: Atomic::new(SyncFlags::empty()),
read_write: Mutex::new((read, write)),
wk_var: var.downgrade(),
last_write: Atomic::new(None),
modified: Atomic::new(None),
});
// task drains pending, drops handle if the var is dropped.
@ -1193,12 +1305,27 @@ impl SyncWithVar {
}
}
SyncEvent::Event(args) => {
if args.events_for_path(&path).next().is_some() {
if args.rescan() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
} else {
let mut read = false;
'ev: for ev in args.changes_for_path(&path) {
for note in ev.notes::<WatcherSyncWriteNote>() {
if path.as_path() == note.as_path() {
// we caused this event
continue 'ev;
}
}
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
read = true;
break;
}
if !read {
return;
}
}
}
SyncEvent::Init => {
if path.exists() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
@ -1243,10 +1370,9 @@ impl SyncWithVar {
return;
};
{
let _note = WATCHER.annotate(path.clone());
write(value, WriteFile::open(path.to_path_buf()));
if let Ok(m) = std::fs::metadata(&*path).and_then(|m| m.modified()) {
task_data.modified.store(Some(m), Ordering::Relaxed);
}
if task_data.wk_var.strong_count() == 0 {
@ -1259,18 +1385,7 @@ impl SyncWithVar {
return;
}
let file = WatchFile::open(path.as_path());
if let Ok(f) = &file {
if let Ok(m) = f.metadata().and_then(|f| f.modified()) {
let last = task_data.modified.swap(Some(m), Ordering::Relaxed);
if last == Some(m) {
// already handled
return;
}
}
}
if let Some(update) = read(file) {
if let Some(update) = read(WatchFile::open(path.as_path())) {
if let Some(var) = task_data.wk_var.upgrade() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::SKIP_WRITE);
var.set(update);
@ -1567,6 +1682,12 @@ impl Watchers {
}
fn allow(&mut self, r: &notify::Event) -> bool {
if let notify::EventKind::Access(_) = r.kind {
if !r.need_rescan() {
return false;
}
}
for (dir, w) in &mut self.dirs {
let mut matched = false;