Reorganized SyncWithVar to use only one Arc for all shared data.

This commit is contained in:
Samuel Guerra 2023-06-03 11:52:11 -03:00
parent 5ee4af0af0
commit 734dfc7e97
1 changed files with 50 additions and 42 deletions

View File

@ -26,7 +26,7 @@ use crate::{
text::Txt,
timer::{DeadlineHandle, TIMERS},
units::*,
var::*,
var::{types::WeakArcVar, *},
};
/// Application extension that provides file system change events and service.
@ -1139,32 +1139,39 @@ enum ReadEvent<'a> {
}
struct SyncWithVar {
task: Box<dyn Fn(&Arc<Atomic<SyncFlags>>, &WatcherHandle, SyncEvent) + Send + Sync>,
pending: Arc<Atomic<SyncFlags>>,
task: Box<dyn Fn(&WatcherHandle, SyncEvent) + Send + Sync>,
handle: WatcherHandle,
}
impl SyncWithVar {
fn new<O: VarValue>(
handle: WatcherHandle,
mut file: PathBuf,
init: O,
read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
) -> (Self, ArcVar<O>) {
fn new<O, R, W>(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W) -> (Self, ArcVar<O>)
where
O: VarValue,
R: FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
W: FnMut(O, io::Result<WriteFile>) + Send + 'static,
{
if let Ok(p) = file.absolutize() {
file = p.into_owned();
}
let path = Arc::new(file);
let var = var(init);
let pending = Arc::new(Atomic::new(SyncFlags::empty()));
let read_write = Arc::new(Mutex::new((read, write)));
let wk_var = var.downgrade();
let last_write = Arc::new(Atomic::new(None::<Instant>));
struct TaskData<R, W, O: VarValue> {
pending: Atomic<SyncFlags>,
read_write: Mutex<(R, W)>,
wk_var: WeakArcVar<O>,
last_write: Atomic<Option<Instant>>,
}
let task_data = Arc::new(TaskData {
pending: Atomic::new(SyncFlags::empty()),
read_write: Mutex::new((read, write)),
wk_var: var.downgrade(),
last_write: Atomic::new(None),
});
// task "drains" pending, drops handle if the var is dropped.
let task = Box::new(move |pending: &Arc<Atomic<SyncFlags>>, handle: &WatcherHandle, ev: SyncEvent| {
let var = match wk_var.upgrade() {
// task drains pending, drops handle if the var is dropped.
let task = Box::new(move |handle: &WatcherHandle, ev: SyncEvent| {
let var = match task_data.wk_var.upgrade() {
Some(v) => v,
None => {
handle.clone().force_drop();
@ -1176,9 +1183,9 @@ impl SyncWithVar {
match ev {
SyncEvent::Update(sync_debounce) => {
if var.is_new() && !SyncFlags::pop(pending, SyncFlags::SKIP_WRITE) {
if var.is_new() && !SyncFlags::pop(&task_data.pending, SyncFlags::SKIP_WRITE) {
debounce = Some(sync_debounce);
SyncFlags::atomic_insert(pending, SyncFlags::WRITE);
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::WRITE);
} else {
return;
}
@ -1186,50 +1193,50 @@ impl SyncWithVar {
SyncEvent::Event(args) => {
// !!: SKIP_READ can skip correct changes (args can aggregate many events)
// check hash?
if args.events_for_path(&path).next().is_some() && !SyncFlags::pop(pending, SyncFlags::SKIP_READ) {
SyncFlags::atomic_insert(pending, SyncFlags::READ);
if args.events_for_path(&path).next().is_some() && !SyncFlags::pop(&task_data.pending, SyncFlags::SKIP_READ) {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
} else {
return;
}
}
SyncEvent::Init => {
if path.exists() {
SyncFlags::atomic_insert(pending, SyncFlags::READ);
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
} else {
SyncFlags::atomic_insert(pending, SyncFlags::WRITE);
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::WRITE);
}
}
SyncEvent::FlushShutdown => {
let _done = read_write.lock();
let _done = task_data.read_write.lock();
return;
}
};
drop(var);
if read_write.try_lock().is_none() {
if task_data.read_write.try_lock().is_none() {
// another spawn is already applying
return;
}
task::spawn_wait(clmv!(read_write, wk_var, path, handle, pending, last_write, || {
let mut read_write = read_write.lock();
task::spawn_wait(clmv!(task_data, path, handle, || {
let mut read_write = task_data.read_write.lock();
let (read, write) = &mut *read_write;
loop {
let w = SyncFlags::pop(&pending, SyncFlags::WRITE);
let r = SyncFlags::pop(&pending, SyncFlags::READ);
let w = SyncFlags::pop(&task_data.pending, SyncFlags::WRITE);
let r = SyncFlags::pop(&task_data.pending, SyncFlags::READ);
if w {
if let Some(d) = debounce {
if let Some(t) = last_write.load(Ordering::Relaxed) {
if let Some(t) = task_data.last_write.load(Ordering::Relaxed) {
let elapsed = t.elapsed();
if elapsed < d {
std::thread::sleep(d - elapsed);
}
}
last_write.store(Some(Instant::now()), Ordering::Relaxed);
task_data.last_write.store(Some(Instant::now()), Ordering::Relaxed);
}
let value = if let Some(var) = wk_var.upgrade() {
let value = if let Some(var) = task_data.wk_var.upgrade() {
var.get()
} else {
handle.force_drop();
@ -1237,21 +1244,21 @@ impl SyncWithVar {
};
write(value, WriteFile::open(path.to_path_buf()));
SyncFlags::atomic_insert(&pending, SyncFlags::SKIP_READ);
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::SKIP_READ);
if wk_var.strong_count() == 0 {
if task_data.wk_var.strong_count() == 0 {
handle.force_drop();
return;
}
} else if r {
if wk_var.strong_count() == 0 {
if task_data.wk_var.strong_count() == 0 {
handle.force_drop();
return;
}
if let Some(update) = read(WatchFile::open(path.as_path())) {
if let Some(var) = wk_var.upgrade() {
SyncFlags::atomic_insert(&pending, SyncFlags::SKIP_WRITE);
if let Some(var) = task_data.wk_var.upgrade() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::SKIP_WRITE);
var.set(update);
} else {
handle.force_drop();
@ -1264,9 +1271,10 @@ impl SyncWithVar {
}
}));
});
task(&pending, &handle, SyncEvent::Init);
(Self { task, pending, handle }, var)
task(&handle, SyncEvent::Init);
(Self { task, handle }, var)
}
// Match the event and flag variable update.
@ -1274,7 +1282,7 @@ impl SyncWithVar {
/// Returns if the variable is still alive.
pub fn on_event(&mut self, args: &FsChangesArgs) -> bool {
if !self.handle.is_dropped() {
(self.task)(&self.pending, &self.handle, SyncEvent::Event(args));
(self.task)(&self.handle, SyncEvent::Event(args));
}
!self.handle.is_dropped()
}
@ -1282,14 +1290,14 @@ impl SyncWithVar {
/// Returns if the variable is still alive.
fn retain(&mut self, sync_debounce: Duration) -> bool {
if !self.handle.is_dropped() {
(self.task)(&self.pending, &self.handle, SyncEvent::Update(sync_debounce));
(self.task)(&self.handle, SyncEvent::Update(sync_debounce));
}
!self.handle.is_dropped()
}
fn flush_shutdown(&mut self) {
if !self.handle.is_dropped() {
(self.task)(&self.pending, &self.handle, SyncEvent::FlushShutdown);
(self.task)(&self.handle, SyncEvent::FlushShutdown);
}
}
}