More var and config sync work.

This commit is contained in:
Samuel Guerra 2023-06-20 18:13:07 -03:00
parent f289ae54b5
commit 49bd2c5c5d
7 changed files with 126 additions and 105 deletions

View File

@ -44,6 +44,14 @@
* Refactor into an extension trait.
- Is more discoverable as an extension trait, maybe suggested by tooling (rustc, ra)?
```rust
/// Extensions methods for [`WINDOW`] contexts of windows open by [`WINDOWS`].
#[allow(non_camel_case_types)]
pub trait WINDOW_Ext {
}
impl WINDOW_Ext for WINDOW { }
```
# View-Process
@ -66,4 +74,14 @@
* Image paste some pixel columns swapped (wrap around start).
- Some corrupted pixels, probably same reason.
* Screenshot paste does not have scale-factor.
* Screenshot paste does not have scale-factor.
# Config
* Status race condition:
- Test `fallback_reset_entry` in a loop.
- Initial write starts immediately.
- At the "same time" the config updates to an actual value (status set to write).
- Initial write finishes, sets status to idle.
- Waiter is released.
- Actual value write starts.

View File

@ -221,7 +221,7 @@ fn concurrent_read_write() {
rmv_file_assert(&file);
let mut app = App::default().run_headless(false);
CONFIG.load(JsonConfig::sync(&file));
CONFIG.get("key", || Txt::from_static("default")).set("custom").unwrap();
CONFIG.get("key", || Txt::from_static("default/custom")).set("custom").unwrap();
app.run_task(async {
task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap();
@ -243,7 +243,7 @@ fn concurrent_read_write() {
task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap();
});
let var = CONFIG.get("key", || Txt::from_static("default"));
let var = CONFIG.get("key", || Txt::from_static("default/get"));
for _ in 0..8 {
assert_eq!("custom", var.get());
var.set("custom").unwrap();
@ -272,7 +272,7 @@ fn fallback_swap() {
let mut app = App::default().run_headless(false);
CONFIG.load(JsonConfig::sync(&fallback_cfg));
CONFIG.get("key", || Txt::from_static("default")).set("fallback").unwrap();
CONFIG.get("key", || Txt::from_static("default/fallback")).set("fallback").unwrap();
app.update(false).assert_wait();
app.run_task(async {
@ -284,7 +284,7 @@ fn fallback_swap() {
}
CONFIG.load(JsonConfig::sync(&main_prepared_cfg));
CONFIG.get("key", || Txt::from_static("default")).set("main").unwrap();
CONFIG.get("key", || Txt::from_static("default/main")).set("main").unwrap();
app.run_task(async {
task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap();
@ -309,7 +309,7 @@ fn fallback_swap() {
app.update(false).assert_wait();
let key = CONFIG.get("key", || Txt::from_static("final-default"));
let key = CONFIG.get("key", || Txt::from_static("default/get"));
assert_eq!("fallback", key.get());
std::fs::rename(main_prepared_cfg, main_cfg).unwrap();
@ -338,7 +338,7 @@ fn fallback_reset() {
let mut app = App::default().run_headless(false);
CONFIG.load(JsonConfig::sync(&fallback_cfg));
CONFIG.get("key", || Txt::from_static("default")).set("fallback").unwrap();
CONFIG.get("key", || Txt::from_static("default/fallback")).set("fallback").unwrap();
app.update(false).assert_wait();
app.run_task(async {
@ -350,7 +350,7 @@ fn fallback_reset() {
}
CONFIG.load(JsonConfig::sync(&main_cfg));
CONFIG.get("key", || Txt::from_static("default")).set("main").unwrap();
CONFIG.get("key", || Txt::from_static("default/main")).set("main").unwrap();
app.run_task(async {
task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap();
@ -375,7 +375,7 @@ fn fallback_reset() {
app.update(false).assert_wait();
let key = CONFIG.get("key", || Txt::from_static("final-default"));
let key = CONFIG.get("key", || Txt::from_static("default/get"));
assert_eq!("main", key.get());
CONFIG.load(MemoryConfig::default());
@ -406,7 +406,7 @@ fn fallback_reset_entry() {
let mut app = App::default().run_headless(false);
CONFIG.load(JsonConfig::sync(&fallback_cfg));
CONFIG.get("key", || Txt::from_static("default")).set("fallback").unwrap();
CONFIG.get("key", || Txt::from_static("default/fallback")).set("fallback").unwrap();
app.update(false).assert_wait();
app.run_task(async {
@ -418,7 +418,7 @@ fn fallback_reset_entry() {
}
CONFIG.load(JsonConfig::sync(&main_cfg));
CONFIG.get("key", || Txt::from_static("default")).set("main").unwrap();
CONFIG.get("key", || Txt::from_static("default/main")).set("main").unwrap();
app.run_task(async {
task::with_deadline(CONFIG.wait_idle(), 10.secs()).await.unwrap();
@ -448,7 +448,7 @@ fn fallback_reset_entry() {
.unwrap();
});
let key = cfg.get("key", || Txt::from_static("default"));
let key = cfg.get("key", || Txt::from_static("default/get"));
assert_eq!("main", key.get());
cfg.reset(&ConfigKey::from_static("key"));

View File

@ -765,12 +765,12 @@ impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
#[must_use = "the note is removed when the handle is dropped"]
pub struct FsChangeNoteHandle(Arc<Arc<dyn FsChangeNote>>);
/// Annotation for file watcher events.
/// Annotation for file watcher events and var update tags.
///
/// Identifies the [`WATCHER.sync`] file that is currently being written to.
///
/// [`WATCHER.sync`]: WATCHER::sync
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct WatcherSyncWriteNote(PathBuf);
impl WatcherSyncWriteNote {
/// Deref.
@ -1026,6 +1026,7 @@ impl WatcherService {
file,
init,
open,
// read
clmv!(status, |d| {
status.set(S::reading());
match read(d) {
@ -1041,6 +1042,7 @@ impl WatcherService {
}
}
}),
// on_modify
clmv!(status, || {
status.set(S::idle());
}),
@ -1094,6 +1096,7 @@ impl WatcherService {
dir,
init,
if recursive { open_recursive } else { open },
// read
clmv!(status, |d| {
status.set(S::reading());
match read(d) {
@ -1109,6 +1112,7 @@ impl WatcherService {
}
}
}),
// on_modify
clmv!(status, || {
status.set(S::idle());
}),
@ -1127,7 +1131,7 @@ impl WatcherService {
) -> ArcVar<O> {
let handle = self.watch(file.clone());
let (sync, var) = SyncWithVar::new(handle, file, init, read, write, || {});
let (sync, var) = SyncWithVar::new(handle, file, init, read, write, |_| {});
self.sync_with_var.push(sync);
var
}
@ -1146,20 +1150,16 @@ impl WatcherService {
let handle = self.watch(file.clone());
let status = var(S::reading());
let next_var_update_status = Arc::new(Atomic::new(S::writing as fn() -> S));
let (sync, var) = SyncWithVar::new(
handle,
file,
init,
clmv!(status, next_var_update_status, |f| {
// read
clmv!(status, |f| {
status.set(S::reading());
match read(f) {
Ok(r) => {
if r.is_some() {
// status handled by `on_modify`.
next_var_update_status.store(S::idle, atomic::Ordering::Relaxed);
} else {
if r.is_none() {
status.set(S::idle());
}
r
@ -1170,8 +1170,9 @@ impl WatcherService {
}
}
}),
// write
clmv!(status, |o, f| {
status.set(S::writing());
status.set(S::writing()); // init write
match write(o, f) {
Ok(()) => {
status.set(S::idle());
@ -1181,9 +1182,9 @@ impl WatcherService {
}
}
}),
clmv!(status, || {
let status_fn = next_var_update_status.swap(S::writing, atomic::Ordering::Relaxed);
status.set(status_fn());
// hook&modify
clmv!(status, |is_read| {
status.set(if is_read { S::idle() } else { S::writing() });
}),
);
@ -1356,30 +1357,48 @@ struct SyncWithVar {
handle: WatcherHandle,
}
impl SyncWithVar {
fn new<O, R, W, U>(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W, on_modify: U) -> (Self, ArcVar<O>)
fn new<O, R, W, U>(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W, var_hook_and_modify: U) -> (Self, ArcVar<O>)
where
O: VarValue,
R: FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
W: FnMut(O, io::Result<WriteFile>) + Send + 'static,
U: Fn() + Send + Sync + 'static,
U: Fn(bool) + Send + Sync + 'static,
{
if let Ok(p) = file.absolutize() {
file = p.into_owned();
}
let path = Arc::new(WatcherSyncWriteNote(file));
let var = var(init);
let latest_from_read = Arc::new(AtomicBool::new(false));
let on_modify = Arc::new(on_modify);
let var_hook_and_modify = Arc::new(var_hook_and_modify);
let var = var(init);
var.hook(Box::new(clmv!(
path,
latest_from_read,
var_hook_and_modify,
|args: &VarHookArgs| {
let is_read = args.downcast_tags::<Arc<WatcherSyncWriteNote>>().any(|n| n == &path);
latest_from_read.store(is_read, Ordering::Relaxed);
var_hook_and_modify(is_read);
true
}
)))
.perm();
type PendingFlag = u8;
const READ: PendingFlag = 0b01;
const WRITE: PendingFlag = 0b11;
struct TaskData<R, W, O: VarValue> {
pending: Atomic<SyncFlags>,
pending: Atomic<PendingFlag>,
read_write: Mutex<(R, W)>,
wk_var: WeakArcVar<O>,
last_write: Atomic<Option<Instant>>,
}
let task_data = Arc::new(TaskData {
pending: Atomic::new(SyncFlags::empty()),
pending: Atomic::new(0),
read_write: Mutex::new((read, write)),
wk_var: var.downgrade(),
last_write: Atomic::new(None),
@ -1397,20 +1416,21 @@ impl SyncWithVar {
let mut debounce = None;
let mut pending = 0;
match ev {
SyncEvent::Update(sync_debounce) => {
if var.is_new() && !SyncFlags::pop(&task_data.pending, SyncFlags::SKIP_WRITE) {
if var.is_new() && !latest_from_read.load(Ordering::Relaxed) {
debounce = Some(sync_debounce);
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::WRITE);
pending |= WRITE;
} else {
return;
}
}
SyncEvent::Event(args) => {
if args.rescan() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
pending |= READ;
} else {
let mut read = false;
'ev: for ev in args.changes_for_path(&path) {
for note in ev.notes::<WatcherSyncWriteNote>() {
if path.as_path() == note.as_path() {
@ -1419,20 +1439,19 @@ impl SyncWithVar {
}
}
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
read = true;
pending |= READ;
break;
}
if !read {
if pending == 0 {
return;
}
}
}
SyncEvent::Init => {
if path.exists() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::READ);
pending |= READ;
} else {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::WRITE);
pending |= WRITE;
}
}
SyncEvent::FlushShutdown => {
@ -1442,19 +1461,20 @@ impl SyncWithVar {
};
drop(var);
task_data.pending.fetch_or(pending, Ordering::Relaxed);
if task_data.read_write.try_lock().is_none() {
// another spawn is already applying
return;
}
task::spawn_wait(clmv!(task_data, path, handle, on_modify, || {
task::spawn_wait(clmv!(task_data, path, var_hook_and_modify, handle, || {
let mut read_write = task_data.read_write.lock();
let (read, write) = &mut *read_write;
loop {
let w = SyncFlags::pop(&task_data.pending, SyncFlags::WRITE);
let r = SyncFlags::pop(&task_data.pending, SyncFlags::READ);
let pending = task_data.pending.swap(0, Ordering::Relaxed);
if w {
if pending == WRITE {
if let Some(d) = debounce {
if let Some(t) = task_data.last_write.load(Ordering::Relaxed) {
let elapsed = t.elapsed();
@ -1481,7 +1501,7 @@ impl SyncWithVar {
handle.force_drop();
return;
}
} else if r {
} else if pending == READ {
if task_data.wk_var.strong_count() == 0 {
handle.force_drop();
return;
@ -1489,11 +1509,10 @@ impl SyncWithVar {
if let Some(update) = read(WatchFile::open(path.as_path())) {
if let Some(var) = task_data.wk_var.upgrade() {
SyncFlags::atomic_insert(&task_data.pending, SyncFlags::SKIP_WRITE);
var.modify(clmv!(on_modify, |vm| {
var.modify(clmv!(path, var_hook_and_modify, |vm| {
vm.set(update);
on_modify();
vm.push_tag(path);
var_hook_and_modify(true);
}));
} else {
handle.force_drop();
@ -1542,40 +1561,6 @@ enum SyncEvent<'a> {
Init,
FlushShutdown,
}
bitflags! {
#[derive(Clone, Copy)]
struct SyncFlags: u8 {
const READ = 0b0000_0001;
const WRITE = 0b0000_0010;
const SKIP_WRITE = 0b0010_0000;
}
}
impl SyncFlags {
fn atomic_insert(f: &Atomic<Self>, flag: Self) {
let _ = f.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut f| {
if f.contains(flag) {
None
} else {
f.insert(flag);
Some(f)
}
});
}
fn pop(f: &Atomic<Self>, flag: Self) -> bool {
let mut contains = false;
let _ = f.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut f| {
if f.contains(flag) {
contains = true;
f.remove(flag);
Some(f)
} else {
None
}
});
contains
}
}
struct Watchers {
dirs: HashMap<PathBuf, DirWatcher>,

View File

@ -1076,6 +1076,25 @@ impl<T: VarValue> Clone for OnVarArgs<T> {
}
}
/// Args for [`Var::trace_value`].
pub struct TraceValueArgs<'a, T: VarValue> {
args: &'a VarHookArgs<'a>,
_type: PhantomData<&'a T>,
}
impl<'a, T: VarValue> ops::Deref for TraceValueArgs<'a, T> {
type Target = VarHookArgs<'a>;
fn deref(&self) -> &Self::Target {
self.args
}
}
impl<'a, T: VarValue> TraceValueArgs<'a, T> {
/// Strongly-typed reference to the new value.
pub fn value(&self) -> &'a T {
self.args.downcast_value::<T>().unwrap()
}
}
/// Represents an observable value.
///
/// All variable types can be read, some can update, variables update only in between app updates so
@ -1863,13 +1882,22 @@ pub trait Var<T: VarValue>: IntoVar<T, Var = Self> + AnyVar + Clone {
/// [`actual_var`]: Var::actual_var
fn trace_value<E, S>(&self, mut enter_value: E) -> VarHandle
where
E: FnMut(&OnVarArgs<T>) -> S + Send + 'static,
E: FnMut(&TraceValueArgs<T>) -> S + Send + 'static,
S: Send + 'static,
{
let mut span = Some(enter_value(&OnVarArgs::new(self.get(), vec![])));
self.on_pre_new(app_hn!(|args, _| {
let span = self.with(|v| {
enter_value(&TraceValueArgs {
args: &VarHookArgs::new(v, false, &[]),
_type: PhantomData,
})
});
let data = Mutex::new((Some(span), enter_value));
self.hook(Box::new(move |args| {
let mut data = data.lock();
let (span, enter_value) = &mut *data;
let _ = span.take();
span = Some(enter_value(args));
*span = Some(enter_value(&TraceValueArgs { args, _type: PhantomData }));
true
}))
}

View File

@ -1021,8 +1021,8 @@ mod cow {
let base_values = Arc::new(Mutex::new(vec![]));
let cow_values = Arc::new(Mutex::new(vec![]));
base.trace_value(clmv!(base_values, |v| base_values.lock().push(v.value))).perm();
cow.trace_value(clmv!(cow_values, |v| cow_values.lock().push(v.value))).perm();
base.trace_value(clmv!(base_values, |v| base_values.lock().push(*v.value()))).perm();
cow.trace_value(clmv!(cow_values, |v| cow_values.lock().push(*v.value()))).perm();
base.set(1);
app.update(false).assert_wait();
@ -1069,8 +1069,8 @@ mod multi {
let a_values = Arc::new(Mutex::new(vec![]));
let b_values = Arc::new(Mutex::new(vec![]));
a.trace_value(clmv!(a_values, |v| a_values.lock().push(v.value))).perm();
b.trace_value(clmv!(b_values, |v| b_values.lock().push(v.value))).perm();
a.trace_value(clmv!(a_values, |v| a_values.lock().push(*v.value()))).perm();
b.trace_value(clmv!(b_values, |v| b_values.lock().push(*v.value()))).perm();
assert!(!a.get());
assert_eq!(b.get(), 0);

View File

@ -273,17 +273,18 @@ impl VARS {
drop(vars);
update_each_and_bindings(updates, 0);
vars = VARS_SV.write();
vars.updating_thread = None;
if !vars.updates_after.get_mut().is_empty() {
drop(vars);
let depth = depth + 1;
if depth == 10 {
// high-pressure from worker threads, skip
return;
}
drop(vars);
Self::apply_updates_and_after(depth)
}
}

View File

@ -1377,17 +1377,6 @@ impl fmt::Debug for WindowLoadingHandle {
}
}
/*
/// Extensions methods for [`WINDOW`] contexts of windows open by [`WINDOWS`].
#[allow(non_camel_case_types)]
pub trait WINDOW_Ext {
}
impl WINDOW_Ext for WINDOW { }
*/
/// Control and variables of the context [`WINDOW`] created using [`WINDOWS`].
#[allow(non_camel_case_types)]
pub struct WINDOW_CTRL;