zng_ext_fs_watcher/
lib.rs

1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! File system events and service.
5//!
6//! # Events
7//!
8//! Events this extension provides.
9//!
10//! * [`FS_CHANGES_EVENT`]
11//!
12//! # Services
13//!
14//! Services this extension provides.
15//!
16//! * [`WATCHER`]
17//!
18//! # Crate
19//!
20#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
21// suppress nag about very simple boxed closure signatures.
22#![expect(clippy::type_complexity)]
23#![warn(unused_extern_crates)]
24#![warn(missing_docs)]
25
26use std::{
27    any::Any,
28    fmt, fs,
29    io::{self, Write as _},
30    ops,
31    path::{Path, PathBuf},
32    sync::Arc,
33    time::Duration,
34};
35
36use parking_lot::Mutex;
37use path_absolutize::Absolutize;
38use zng_app::{
39    DInstant, INSTANT,
40    event::event_args,
41    handler::{Handler, HandlerExt as _},
42};
43use zng_clone_move::clmv;
44use zng_handle::Handle;
45use zng_txt::Txt;
46use zng_unit::TimeUnits;
47use zng_var::{AnyVar, BoxAnyVarValue, Var, VarHandle, VarValue, WeakAnyVar, var};
48
49mod service;
50use service::*;
51
52mod lock;
53use lock::*;
54
55/// File system watcher service.
56///
57/// This is mostly a wrapper around the [`notify`](https://docs.rs/notify) crate, integrating it with events and variables.
58pub struct WATCHER;
59impl WATCHER {
60    /// Gets a read-write variable that defines interval awaited between each [`FS_CHANGES_EVENT`]. If
61    /// a watched path is constantly changing an event will be emitted every elapse of this interval,
62    /// the event args will contain a list of all the changes observed during the interval.
63    ///
64    /// Note that the first event notifies immediately, only subsequent events within this interval are debounced.
65    ///
66    /// Is `100.ms()` by default.
67    pub fn debounce(&self) -> Var<Duration> {
68        WATCHER_SV.read().debounce.clone()
69    }
70
71    /// Gets a read-write variable that defines interval awaited between each [`sync`] write.
72    ///
73    /// Is `100.ms()` by default.
74    ///
75    /// Note that each [`sync`] is debounced in isolation and the first write happens immediately.
76    ///
77    /// [`sync`]: WATCHER::sync
78    pub fn sync_debounce(&self) -> Var<Duration> {
79        WATCHER_SV.read().sync_debounce.clone()
80    }
81
82    /// Gets a read-write variable that defines the fallback poll watcher interval.
83    ///
84    /// When an efficient watcher cannot be used a poll watcher fallback is used, the poll watcher reads
85    /// the directory or path every elapse of this interval. The poll watcher is also used for paths that
86    /// do not exist yet, that is also affected by this interval.
87    ///
88    /// Is `1.secs()` by default.
89    pub fn poll_interval(&self) -> Var<Duration> {
90        WATCHER_SV.read().poll_interval.clone()
91    }
92
93    /// Maximum time the service keeps the process alive to finish pending IO operations when the app shuts down.
94    ///
95    /// Is 1 minute by default.
96    pub fn shutdown_timeout(&self) -> Var<Duration> {
97        WATCHER_SV.read().shutdown_timeout.clone()
98    }
99
100    /// Enable [file change events] for the `file`.
101    ///
102    /// Returns a handle that will stop the file watch when dropped, if there is no other active handler for the same file.
103    ///
104    /// Note that this is implemented by actually watching the parent directory and filtering the events, this is done
105    /// to ensure the watcher survives operations that remove the file and then move another file to the same path.
106    ///
107    /// See [`watch_dir`] for more details.
108    ///
109    /// [`watch_dir`]: WATCHER::watch_dir
110    /// [file change events]: FS_CHANGES_EVENT
111    pub fn watch(&self, file: impl Into<PathBuf>) -> WatcherHandle {
112        WATCHER_SV.write().watch(file.into())
113    }
114
115    /// Enable [file change events] for files inside `dir`, also include inner directories if `recursive` is `true`.
116    ///
117    /// Returns a handle that will stop the dir watch when dropped, if there is no other active handler for the same directory.
118    ///
119    /// The directory will be watched using an OS specific efficient watcher provided by the [`notify`](https://docs.rs/notify) crate. If there is
120    /// any error creating the watcher, such as if the directory does not exist yet a slower polling watcher will retry periodically    
121    /// until the efficient watcher can be created or the handle is dropped.
122    ///
123    /// [file change events]: FS_CHANGES_EVENT
124    pub fn watch_dir(&self, dir: impl Into<PathBuf>, recursive: bool) -> WatcherHandle {
125        WATCHER_SV.write().watch_dir(dir.into(), recursive)
126    }
127
128    /// Read a file into a variable, the `init` value will start the variable and the `read` closure will be called
129    /// once immediately and every time the file changes, if the closure returns `Some(O)` the variable updates with the new value.
130    ///
131    /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
132    /// background thread.
133    ///
134    /// [`task::wait`]: zng_task::wait
135    pub fn read<O: VarValue>(
136        &self,
137        file: impl Into<PathBuf>,
138        init: O,
139        mut read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
140    ) -> Var<O> {
141        let out = var(init);
142        self.read_impl(
143            absolutize_or_pass(file.into()),
144            out.as_any().downgrade(),
145            Box::new(move |r| read(r).map(BoxAnyVarValue::new)),
146        );
147        out.read_only()
148    }
149    fn read_impl(
150        &self,
151        file: PathBuf,
152        out: WeakAnyVar,
153        read: Box<dyn FnMut(io::Result<WatchFile>) -> Option<BoxAnyVarValue> + Send + 'static>,
154    ) {
155        let _handle = WATCHER.watch(file.clone());
156        struct Data {
157            file: PathBuf,
158            read: Mutex<Box<dyn FnMut(io::Result<WatchFile>) -> Option<BoxAnyVarValue> + Send + 'static>>,
159        }
160        let data = Arc::new(Data {
161            file,
162            read: Mutex::new(read),
163        });
164        fn spawn_read(data: &Arc<Data>, out: AnyVar) {
165            zng_task::spawn_wait(clmv!(data, || {
166                if let Some(r) = data.read.lock()(WatchFile::open(&data.file)) {
167                    // did read update, set output
168                    out.modify(clmv!(data, |o| {
169                        if o.set(r) {
170                            // tag to avoid write back when `read_impl` is used in `write_impl`
171                            o.push_tag(WatcherSyncWriteNote(data.file.clone()));
172                        }
173                    }));
174                }
175            }));
176        }
177        if data.file.exists() {
178            // read initial value
179            spawn_read(&data, out.upgrade().unwrap());
180        }
181        FS_CHANGES_EVENT
182            .hook(move |args| {
183                let _hold = &_handle;
184
185                if let Some(out) = out.upgrade() {
186                    if has_sync_changes(args, &data.file) {
187                        // has changes, spawn read
188                        spawn_read(&data, out);
189                    }
190                    true
191                } else {
192                    // out var dropped
193                    false
194                }
195            })
196            .perm();
197    }
198
199    /// Same operation as [`read`] but also tracks the operation status in a second var.
200    ///
201    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
202    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
203    /// only updates to idle  when the new value is available on the var, or because read the same value.
204    ///
205    /// [`read`]: Self::read
206    pub fn read_status<O, S, E>(
207        &self,
208        file: impl Into<PathBuf>,
209        init: O,
210        mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
211    ) -> (Var<O>, Var<S>)
212    where
213        O: VarValue,
214        S: WatcherReadStatus<E>,
215        E: Any,
216    {
217        let out = var(init);
218        let status = var(S::idle());
219        self.read_status_impl(
220            absolutize_or_pass(file.into()),
221            out.as_any().downgrade(),
222            status.as_any().clone(),
223            Box::new(move |r| match read(r) {
224                Ok(Some(r)) => Ok(Some(BoxAnyVarValue::new(r))),
225                Ok(None) => Ok(None),
226                Err(e) => Err(Box::new(e)),
227            }),
228            AnyWatcherReadStatus::new::<E, S>(),
229        );
230        (out.read_only(), status.read_only())
231    }
232    fn read_status_impl(
233        &self,
234        file: PathBuf,
235        out: WeakAnyVar,
236        status: AnyVar,
237        read: Box<dyn FnMut(io::Result<WatchFile>) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>,
238        status_mtds: AnyWatcherReadStatus,
239    ) {
240        let _handle = WATCHER.watch(file.clone());
241        struct Data {
242            file: PathBuf,
243            read: Mutex<Box<dyn FnMut(io::Result<WatchFile>) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>>,
244            status: AnyVar,
245            status_mtds: AnyWatcherReadStatus,
246        }
247        let data = Arc::new(Data {
248            file,
249            read: Mutex::new(read),
250            status,
251            status_mtds,
252        });
253        fn spawn_read(data: &Arc<Data>, out: AnyVar) {
254            data.status.set((data.status_mtds.reading)());
255            zng_task::spawn_wait(clmv!(data, || {
256                let mut read = match data.read.try_lock() {
257                    Some(l) => l,
258                    None => return, // another spawn either already has read lock on the file or will acquire it
259                };
260                match read(WatchFile::open(&data.file)) {
261                    Ok(u) => {
262                        if let Some(r) = u {
263                            // did read update, set output
264                            out.modify(clmv!(data, |o| {
265                                if o.set(r) {
266                                    // tag to avoid write back when `read_status_impl` is used in `sync_status_impl`
267                                    o.push_tag(WatcherSyncWriteNote(data.file.clone()));
268                                }
269                            }));
270                        }
271                        data.status.set((data.status_mtds.idle)());
272                    }
273                    Err(e) => {
274                        // read error
275                        data.status.set((data.status_mtds.read_error)(Box::new(e)));
276                    }
277                }
278            }));
279        }
280        if data.file.exists() {
281            // read initial value
282            spawn_read(&data, out.upgrade().unwrap());
283        }
284        FS_CHANGES_EVENT
285            .hook(move |args| {
286                let _hold = &_handle;
287                if let Some(out) = out.upgrade() {
288                    if has_sync_changes(args, &data.file) {
289                        // has changes, spawn read
290                        spawn_read(&data, out);
291                    }
292                    true
293                } else {
294                    // out dropped
295                    false
296                }
297            })
298            .perm();
299    }
300
301    /// Read a directory into a variable, the `init` value will start the variable and the `read` closure will be called
302    /// once immediately and every time any changes happen inside the dir, if the closure returns `Some(O)` the variable updates with the new value.
303    ///
304    /// The `read` closure parameter is a directory walker from the [`walkdir`](https://docs.rs/walkdir) crate.
305    ///
306    /// The directory walker is pre-configured to skip the `dir` itself and to have a max-depth of 1 if not `recursive`, these configs can.
307    ///
308    /// Dropping the variable drops the read watch. The `read` closure is non-blocking, it is called in a [`task::wait`]
309    /// background thread.
310    ///
311    /// [`task::wait`]: zng_task::wait
312    pub fn read_dir<O: VarValue>(
313        &self,
314        dir: impl Into<PathBuf>,
315        recursive: bool,
316        init: O,
317        mut read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
318    ) -> Var<O> {
319        let out = var(init);
320        self.read_dir_impl(
321            absolutize_or_pass(dir.into()),
322            out.as_any().downgrade(),
323            recursive,
324            Box::new(move |r| read(r).map(BoxAnyVarValue::new)),
325        );
326        out.read_only()
327    }
328    fn read_dir_impl(
329        &self,
330        dir: PathBuf,
331        out: WeakAnyVar,
332        recursive: bool,
333        read: Box<dyn FnMut(walkdir::WalkDir) -> Option<BoxAnyVarValue> + Send + 'static>,
334    ) {
335        let _handle = WATCHER.watch_dir(dir.clone(), recursive);
336        struct Data {
337            dir: PathBuf,
338            read: Mutex<Box<dyn FnMut(walkdir::WalkDir) -> Option<BoxAnyVarValue> + Send + 'static>>,
339        }
340        let data = Arc::new(Data {
341            dir,
342            read: Mutex::new(read),
343        });
344        fn spawn_read(data: &Arc<Data>, recursive: bool, out: AnyVar) {
345            zng_task::spawn_wait(clmv!(data, || {
346                let dir = if recursive {
347                    walkdir::WalkDir::new(&data.dir).min_depth(1)
348                } else {
349                    walkdir::WalkDir::new(&data.dir).min_depth(1).max_depth(1)
350                };
351                if let Some(r) = data.read.lock()(dir) {
352                    // did read update, set output
353                    out.modify(clmv!(data, |o| {
354                        if o.set(r) {
355                            // tag just for parity with `read`
356                            o.push_tag(WatcherSyncWriteNote(data.dir.clone()));
357                        }
358                    }));
359                }
360            }));
361        }
362        if data.dir.exists() {
363            // read initial value
364            spawn_read(&data, recursive, out.upgrade().unwrap());
365        }
366        FS_CHANGES_EVENT
367            .hook(move |args| {
368                let _hold = &_handle;
369                if let Some(out) = out.upgrade() {
370                    if has_sync_changes(args, &data.dir) {
371                        // has changes, spawn read
372                        spawn_read(&data, recursive, out);
373                    }
374                    true
375                } else {
376                    // out var dropped
377                    false
378                }
379            })
380            .perm();
381    }
382
383    /// Same operation as [`read_dir`] but also tracks the operation status in a second var.
384    ///
385    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
386    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
387    /// only updates to idle when the new value is available on the var, or because read the same value.
388    ///
389    /// [`read_dir`]: Self::read_dir
390    pub fn read_dir_status<O, S, E>(
391        &self,
392        dir: impl Into<PathBuf>,
393        recursive: bool,
394        init: O,
395        mut read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
396    ) -> (Var<O>, Var<S>)
397    where
398        O: VarValue,
399        S: WatcherReadStatus<E>,
400        E: Any,
401    {
402        let out = var(init);
403        let status = var(S::idle());
404
405        self.read_dir_status_impl(
406            absolutize_or_pass(dir.into()),
407            out.as_any().downgrade(),
408            status.as_any().clone(),
409            recursive,
410            Box::new(move |r| match read(r) {
411                Ok(Some(r)) => Ok(Some(BoxAnyVarValue::new(r))),
412                Ok(None) => Ok(None),
413                Err(e) => Err(Box::new(e)),
414            }),
415            AnyWatcherReadStatus::new::<E, S>(),
416        );
417        (out.read_only(), status.read_only())
418    }
419    fn read_dir_status_impl(
420        &self,
421        dir: PathBuf,
422        out: WeakAnyVar,
423        status: AnyVar,
424        recursive: bool,
425        read: Box<dyn FnMut(walkdir::WalkDir) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>,
426        status_mtds: AnyWatcherReadStatus,
427    ) {
428        let _handle = WATCHER.watch_dir(dir.clone(), recursive);
429        struct Data {
430            dir: PathBuf,
431            read: Mutex<Box<dyn FnMut(walkdir::WalkDir) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>>,
432            status: AnyVar,
433            status_mtds: AnyWatcherReadStatus,
434        }
435        let data = Arc::new(Data {
436            dir,
437            read: Mutex::new(read),
438            status,
439            status_mtds,
440        });
441        fn spawn_read(data: &Arc<Data>, recursive: bool, out: AnyVar) {
442            data.status.set((data.status_mtds.reading)());
443            zng_task::spawn_wait(clmv!(data, || {
444                let dir = if recursive {
445                    walkdir::WalkDir::new(&data.dir).min_depth(1)
446                } else {
447                    walkdir::WalkDir::new(&data.dir).min_depth(1).max_depth(1)
448                };
449                match data.read.lock()(dir) {
450                    Ok(u) => {
451                        if let Some(r) = u {
452                            // did read update, set output
453                            out.modify(clmv!(data, |o| {
454                                if o.set(r) {
455                                    // tag just for parity with `read_status`
456                                    o.push_tag(WatcherSyncWriteNote(data.dir.clone()));
457                                }
458                            }));
459                        }
460                        data.status.set((data.status_mtds.idle)());
461                    }
462                    Err(e) => {
463                        data.status.set((data.status_mtds.read_error)(Box::new(e)));
464                    }
465                }
466            }));
467        }
468        if data.dir.exists() {
469            spawn_read(&data, recursive, out.upgrade().unwrap());
470        }
471        FS_CHANGES_EVENT
472            .hook(move |args| {
473                let _hold = &_handle;
474                if let Some(out) = out.upgrade() {
475                    if has_sync_changes(args, &data.dir) {
476                        // has changes, spawn read
477                        spawn_read(&data, recursive, out);
478                    }
479                    true
480                } else {
481                    // output var dropped
482                    false
483                }
484            })
485            .perm();
486    }
487
488    /// Bind a file with a variable, the `file` will be `read` when it changes and be `write` when the variable changes,
489    /// writes are only applied on success and will not cause a `read` on the same sync task. The `init` value is used to
490    /// create the variable, if the `file` exists it will be `read` once at the beginning.
491    ///
492    /// Dropping the variable drops the read watch. The `read` and `write` closures are non-blocking, they are called in a [`task::wait`]
493    /// background thread.
494    ///
495    /// # Sync
496    ///
497    /// The file synchronization ensures that the file is only actually modified when write is finished by writing
498    /// to a temporary file and committing a replace only if the write succeeded. The file is write-locked for the duration
499    /// of `write` call, but the contents are not touched until commit. See [`WriteFile`] for more details.
500    ///
501    /// The service blocks on app exit until all writes commit or cancel. See [`WATCHER::shutdown_timeout`] for
502    /// more details.
503    ///
504    /// ## Read Errors
505    ///
506    /// Not-found errors are handled by the watcher by calling `write` using the current variable value, other read errors
507    /// are passed to `read`. If `read` returns a value for an error the `write` closure is called to override the file,
508    /// otherwise only the variable is set and this variable update does not cause a `write`.
509    ///
510    /// ## Write Errors
511    ///
512    /// If `write` fails the file is not touched and the temporary file is removed, if the file path
513    /// does not exit all missing parent folders and the file will be created automatically before the `write`
514    /// call.
515    ///
516    /// Note that [`WriteFile::commit`] must be called to flush the temporary file and attempt to rename
517    /// it, if the file is dropped without commit it will cancel and log an error, you must call [`WriteFile::cancel`]
518    /// to correctly avoid writing.
519    ///
520    /// If the cleanup after commit fails the error is logged and ignored.
521    ///
522    /// If write fails to even create the file and/or acquire a write lock on it this error is the input for
523    /// the `write` closure.
524    ///
525    /// ## Error Handling
526    ///
527    /// You can call services or set other variables from inside the `read` and `write` closures, this can be
528    /// used to get a signal out to handle the error, perhaps drop the sync var (to stop watching), alert the user that the
529    /// file is out of sync and initiate some sort of recovery routine.
530    ///
531    /// If the file synchronization is not important you can just ignore it, the watcher will try again
532    /// on the next variable or file update.
533    ///
534    /// ## Status
535    ///
536    /// Note that `read` and `write` run in background task threads, so if you are tracking the operation
537    /// status in a separate variable you may end-up with synchronization bugs between the status variable
538    /// and the actual result variable, you can use [`sync_status`] to implement racing-free status tracking.
539    ///
540    /// [`sync_status`]: Self::sync_status
541    /// [`task::wait`]: zng_task::wait
542    pub fn sync<O: VarValue>(
543        &self,
544        file: impl Into<PathBuf>,
545        init: O,
546        mut read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
547        mut write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
548    ) -> Var<O> {
549        let out = var(init);
550        self.sync_impl(
551            absolutize_or_pass(file.into()),
552            out.as_any().clone(),
553            Box::new(move |r| read(r).map(BoxAnyVarValue::new)),
554            Box::new(move |o, r| write(o.downcast().unwrap(), r)),
555        );
556        out
557    }
558    fn sync_impl(
559        &self,
560        file: PathBuf,
561        out: AnyVar,
562        read: Box<dyn FnMut(io::Result<WatchFile>) -> Option<BoxAnyVarValue> + Send + 'static>,
563        write: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) + Send + 'static>,
564    ) {
565        self.read_impl(file.clone(), out.downgrade(), read);
566
567        struct DataWrite {
568            f: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) + Send + 'static>,
569            last: DInstant,
570        }
571        struct Data {
572            note: Arc<WatcherSyncWriteNote>,
573            write: Mutex<DataWrite>,
574            write_value: Mutex<Option<BoxAnyVarValue>>,
575            debounce_flush: Arc<SyncFlushLock>,
576        }
577        let data = Arc::new(Data {
578            note: Arc::new(WatcherSyncWriteNote(file)),
579            write: Mutex::new(DataWrite {
580                f: write,
581                last: DInstant::EPOCH,
582            }),
583            write_value: Mutex::new(None),
584            debounce_flush: Arc::new(SyncFlushLock::new()),
585        });
586        WATCHER_SV.write().push_sync_flush(&data.debounce_flush);
587
588        out.hook(move |a| {
589            // skip if value was set from `read_impl`
590            for tag in a.downcast_tags::<WatcherSyncWriteNote>() {
591                if *tag == *data.note {
592                    return true;
593                }
594            }
595
596            // spawn write
597            let mut d = data.write_value.lock();
598            let is_running = d.is_some() || data.write.try_lock().is_none();
599            *d = Some(a.value().clone_boxed());
600            if is_running {
601                return true;
602            }
603            zng_task::spawn_wait(clmv!(data, || {
604                // wait other write tasks
605                let mut write = data.write.lock();
606                // wait debounce of APP.on_deinit signal, holds _guard to block on_deinit so the process is not killed while writing
607                let _guard = data
608                    .debounce_flush
609                    .begin_write(WATCHER_SV.read().sync_debounce.get().saturating_sub(write.last.elapsed()));
610
611                // tag events during write so that `read_impl` can avoid reading this change back
612                let _tag = WATCHER.annotate(data.note.clone());
613                while let Some(value) = data.write_value.lock().take() {
614                    (write.f)(value, WriteFile::open(data.note.0.clone()));
615                }
616
617                write.last = INSTANT.now();
618            }));
619
620            true
621        })
622        .perm();
623    }
624
625    /// Same operation as [`sync`] but also tracks the operation status in a second var.
626    ///
627    /// The status variable is set to [`WatcherReadStatus::reading`] as soon as `read` starts and
628    /// is set to [`WatcherReadStatus::idle`] when read returns. If read returns a value the status
629    /// only updates to idle when the new sync value is available, or because read the same value.
630    ///
631    /// The status variable is set to [`WatcherSyncStatus::writing`] as soon as it updates and
632    /// is set to [`WatcherReadStatus::idle`] only when the new sync value is available, either
633    /// by update or because read the same value.
634    ///
635    /// [`sync`]: Self::sync
636    pub fn sync_status<O, S, ER, EW>(
637        &self,
638        file: impl Into<PathBuf>,
639        init: O,
640        mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
641        mut write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
642    ) -> (Var<O>, Var<S>)
643    where
644        O: VarValue,
645        S: WatcherSyncStatus<ER, EW>,
646        ER: Any,
647        EW: Any,
648    {
649        let out = var(init);
650        let status = var(S::idle());
651        self.sync_status_impl(
652            absolutize_or_pass(file.into()),
653            out.as_any().clone(),
654            status.as_any().clone(),
655            Box::new(move |r| match read(r) {
656                Ok(Some(r)) => Ok(Some(BoxAnyVarValue::new(r))),
657                Ok(None) => Ok(None),
658                Err(e) => Err(Box::new(e)),
659            }),
660            AnyWatcherReadStatus::new::<ER, S>(),
661            Box::new(move |o, r| match write(o.downcast().unwrap(), r) {
662                Ok(()) => Ok(()),
663                Err(e) => Err(Box::new(e)),
664            }),
665            AnyWatcherWriteStatus::new::<ER, EW, S>(),
666        );
667        (out, status.read_only())
668    }
669    #[allow(clippy::too_many_arguments)]
670    fn sync_status_impl(
671        &self,
672        file: PathBuf,
673        out: AnyVar,
674        status: AnyVar,
675        read: Box<dyn FnMut(io::Result<WatchFile>) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>,
676        status_mtds: AnyWatcherReadStatus,
677        write: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) -> Result<(), Box<dyn Any>> + Send + 'static>,
678        status_w_mtds: AnyWatcherWriteStatus,
679    ) {
680        self.read_status_impl(file.clone(), out.downgrade(), status.clone(), read, status_mtds);
681
682        struct DataWrite {
683            f: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) -> Result<(), Box<dyn Any>> + Send + 'static>,
684            last: DInstant,
685        }
686        struct Data {
687            note: Arc<WatcherSyncWriteNote>,
688            write: Mutex<DataWrite>,
689            write_value: Mutex<Option<BoxAnyVarValue>>,
690            debounce_flush: Arc<SyncFlushLock>,
691            status: AnyVar,
692            status_w_mtds: AnyWatcherWriteStatus,
693        }
694        let data = Arc::new(Data {
695            note: Arc::new(WatcherSyncWriteNote(file)),
696            write: Mutex::new(DataWrite {
697                f: write,
698                last: DInstant::EPOCH,
699            }),
700            write_value: Mutex::new(None),
701            debounce_flush: Arc::new(SyncFlushLock::new()),
702            status,
703            status_w_mtds,
704        });
705        WATCHER_SV.write().push_sync_flush(&data.debounce_flush);
706
707        out.hook(move |a| {
708            // skip if value was set from `read_impl`
709            for tag in a.downcast_tags::<WatcherSyncWriteNote>() {
710                if *tag == *data.note {
711                    return true;
712                }
713            }
714
715            // spawn write
716            let mut d = data.write_value.lock();
717            let is_running = d.is_some() || data.write.try_lock().is_none();
718            *d = Some(a.value().clone_boxed());
719            if is_running {
720                return true;
721            }
722            data.status.set((data.status_w_mtds.writing)());
723            zng_task::spawn_wait(clmv!(data, || {
724                // wait other write tasks
725                let mut write = data.write.lock();
726                // wait debounce of APP.on_deinit signal, holds _guard to block on_deinit so the process is not killed while writing
727                let debounce = WATCHER_SV.read().sync_debounce.get().saturating_sub(write.last.elapsed());
728                let _guard = data.debounce_flush.begin_write(debounce);
729
730                // tag events during write so that `read_impl` can avoid reading this change back
731                let _tag = WATCHER.annotate(data.note.clone());
732
733                while let Some(value) = data.write_value.lock().take() {
734                    match (write.f)(value, WriteFile::open(data.note.0.clone())) {
735                        Ok(()) => data.status.set((data.status_w_mtds.idle)()),
736                        Err(e) => data.status.set((data.status_w_mtds.write_error)(e)),
737                    }
738                }
739
740                write.last = INSTANT.now();
741            }));
742
743            true
744        })
745        .perm();
746    }
747
748    /// Watch `file` and calls `handler` every time it changes.
749    ///
750    /// Note that the `handler` is blocking, use [`async_hn!`] and [`task::wait`] to run IO without
751    /// blocking the app.
752    ///
753    /// If `ignore_propagation` is `true` also calls `handler` when another handler marked as handled.
754    ///
755    /// [`async_hn!`]: macro@zng_app::handler::async_hn
756    /// [`task::wait`]: zng_task::wait
757    pub fn on_file_changed(&self, file: impl Into<PathBuf>, ignore_propagation: bool, handler: Handler<FsChangesArgs>) -> VarHandle {
758        let file = absolutize_or_pass(file.into());
759        let handle = self.watch(file.clone());
760        FS_CHANGES_EVENT.on_event(
761            ignore_propagation,
762            handler.filtered(move |args| {
763                let _handle = &handle;
764                args.events_for_path(&file).next().is_some()
765            }),
766        )
767    }
768
769    /// Watch `dir` and calls `handler` every time something inside it changes.
770    ///
771    /// Note that the `handler` is blocking, use [`async_hn!`] and [`task::wait`] to run IO without
772    /// blocking the app.
773    ///
774    /// If `ignore_propagation` is `true` also calls `handler` when another handler marked as handled.
775    ///
776    /// [`async_hn!`]: macro@zng_app::handler::async_hn
777    /// [`task::wait`]: zng_task::wait
778    pub fn on_dir_changed(
779        &self,
780        dir: impl Into<PathBuf>,
781        recursive: bool,
782        ignore_propagation: bool,
783        handler: Handler<FsChangesArgs>,
784    ) -> VarHandle {
785        let dir = absolutize_or_pass(dir.into());
786        let handle = self.watch_dir(dir.clone(), recursive);
787        FS_CHANGES_EVENT.on_event(
788            ignore_propagation,
789            handler.filtered(move |args| {
790                let _handle = &handle;
791                args.events_for_path(&dir).next().is_some()
792            }),
793        )
794    }
795
796    /// Push a `note` that will be cloned on all subsequent change events until the returned handle is dropped.
797    ///
798    /// This can be used to tag all events that happened over a period of time, something you can't do just
799    /// by receiving the events due to async delays caused by debounce.
800    ///
801    /// Note that the underlying system events the [`notify`](https://docs.rs/notify) crate uses are not guaranteed to be synchronous.
802    pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
803        WATCHER_SV.write().annotate(note)
804    }
805}
806
807fn has_sync_changes(args: &FsChangesArgs, path: &Path) -> bool {
808    let mut any = false;
809    for change in args.changes_for_path(path) {
810        for note in change.notes::<WatcherSyncWriteNote>() {
811            if note.as_path() == path {
812                return false;
813            }
814        }
815        any = true;
816    }
817    any
818}
819
820fn absolutize_or_pass(path: PathBuf) -> PathBuf {
821    match path.absolutize() {
822        Ok(p) if p != path => p.to_path_buf(),
823        _ => path,
824    }
825}
826
827/// Represents a status type for [`WATCHER.sync_status`].
828///
829/// [`WATCHER.sync_status`]: WATCHER::sync_status
830pub trait WatcherSyncStatus<ER = io::Error, EW = io::Error>: WatcherReadStatus<ER> {
831    /// New writing value.
832    fn writing() -> Self;
833    /// New write error value.
834    fn write_error(e: EW) -> Self;
835}
836
837/// Represents a status type for [`WATCHER`] read-only operations.
838pub trait WatcherReadStatus<ER = io::Error>: VarValue + PartialEq {
839    /// New idle value.
840    fn idle() -> Self;
841    /// New reading value.
842    fn reading() -> Self;
843    /// New read error value.
844    fn read_error(e: ER) -> Self;
845}
846
847struct AnyWatcherReadStatus {
848    idle: fn() -> BoxAnyVarValue,
849    reading: fn() -> BoxAnyVarValue,
850    read_error: fn(Box<dyn Any>) -> BoxAnyVarValue,
851}
852impl AnyWatcherReadStatus {
853    fn new<ER: Any, S: WatcherReadStatus<ER> + VarValue>() -> Self {
854        fn idle_impl<ER, S: WatcherReadStatus<ER>>() -> BoxAnyVarValue {
855            BoxAnyVarValue::new(S::idle())
856        }
857        fn reading_impl<ER, S: WatcherReadStatus<ER>>() -> BoxAnyVarValue {
858            BoxAnyVarValue::new(S::reading())
859        }
860        fn read_error_impl<ER: Any, S: WatcherReadStatus<ER> + VarValue>(e: Box<dyn Any>) -> BoxAnyVarValue {
861            BoxAnyVarValue::new(S::read_error(*e.downcast::<ER>().unwrap()))
862        }
863        Self {
864            idle: idle_impl::<ER, S>,
865            reading: reading_impl::<ER, S>,
866            read_error: read_error_impl::<ER, S>,
867        }
868    }
869}
870
871struct AnyWatcherWriteStatus {
872    idle: fn() -> BoxAnyVarValue,
873    writing: fn() -> BoxAnyVarValue,
874    write_error: fn(Box<dyn Any>) -> BoxAnyVarValue,
875}
876impl AnyWatcherWriteStatus {
877    fn new<ER, EW: Any, S: WatcherSyncStatus<ER, EW> + VarValue>() -> Self {
878        fn idle_impl<ER, S: WatcherReadStatus<ER>>() -> BoxAnyVarValue {
879            BoxAnyVarValue::new(S::idle())
880        }
881        fn writing_impl<ER, EW, S: WatcherSyncStatus<ER, EW>>() -> BoxAnyVarValue {
882            BoxAnyVarValue::new(S::writing())
883        }
884        fn write_error_impl<ER, EW: Any, S: WatcherSyncStatus<ER, EW> + VarValue>(e: Box<dyn Any>) -> BoxAnyVarValue {
885            BoxAnyVarValue::new(S::write_error(*e.downcast::<EW>().unwrap()))
886        }
887        Self {
888            idle: idle_impl::<ER, S>,
889            writing: writing_impl::<ER, EW, S>,
890            write_error: write_error_impl::<ER, EW, S>,
891        }
892    }
893}
894
895/// Represents an open read-only file provided by [`WATCHER.read`].
896///
897/// This type is a thin wrapper around the [`std::fs::File`] with some convenience parsing methods.
898///
899/// [`WATCHER.read`]: WATCHER::read
900#[derive(Debug)]
901pub struct WatchFile(fs::File);
902impl WatchFile {
903    /// Open read the file.
904    pub fn open(file: impl AsRef<Path>) -> io::Result<Self> {
905        Self::try_open_non_empty(file.as_ref(), true)
906    }
907    fn try_open_non_empty(path: &Path, retry: bool) -> io::Result<Self> {
908        let file = fs::File::open(path)?;
909
910        if retry && file.metadata()?.len() == 0 {
911            // some apps create an empty file unlocked, then write.
912            let _ = file;
913            std::thread::sleep(5.ms());
914            return Self::try_open_non_empty(path, false);
915        }
916
917        lock_shared(&file, Duration::from_secs(10))?;
918        Ok(Self(file))
919    }
920
921    /// Read the file contents as a text string.
922    pub fn text(&mut self) -> io::Result<Txt> {
923        self.string().map(Txt::from)
924    }
925
926    /// Read the file contents as a string.
927    pub fn string(&mut self) -> io::Result<String> {
928        use std::io::Read;
929        let mut s = String::new();
930        self.0.read_to_string(&mut s)?;
931        Ok(s)
932    }
933
934    /// Deserialize the file contents as JSON.
935    #[cfg(feature = "json")]
936    pub fn json<O>(&mut self) -> serde_json::Result<O>
937    where
938        O: serde::de::DeserializeOwned,
939    {
940        serde_json::from_reader(io::BufReader::new(&mut self.0))
941    }
942
943    /// Deserialize the file contents as TOML.
944    #[cfg(feature = "toml")]
945    pub fn toml<O>(&mut self) -> io::Result<O>
946    where
947        O: serde::de::DeserializeOwned,
948    {
949        use std::io::Read;
950        let mut buf = io::BufReader::new(&mut self.0);
951
952        let mut toml_str = String::new();
953        buf.read_to_string(&mut toml_str)?;
954
955        toml::de::from_str(&toml_str).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
956    }
957
958    /// Deserialize the file content as RON.
959    #[cfg(feature = "ron")]
960    pub fn ron<O>(&mut self) -> Result<O, ron::de::SpannedError>
961    where
962        O: serde::de::DeserializeOwned,
963    {
964        ron::de::from_reader(io::BufReader::new(&mut self.0))
965    }
966
967    /// Deserialize the file content as YAML.
968    #[cfg(feature = "yaml")]
969    pub fn yaml<O>(&mut self) -> serde_yaml::Result<O>
970    where
971        O: serde::de::DeserializeOwned,
972    {
973        serde_yaml::from_reader(io::BufReader::new(&mut self.0))
974    }
975
976    /// Read file and parse it.
977    pub fn parse<O: std::str::FromStr>(&mut self) -> Result<O, WatchFileParseError<O::Err>> {
978        use std::io::Read;
979        let mut s = String::new();
980        self.0.read_to_string(&mut s)?;
981        O::from_str(&s).map_err(WatchFileParseError::Parse)
982    }
983}
984impl ops::Deref for WatchFile {
985    type Target = fs::File;
986
987    fn deref(&self) -> &Self::Target {
988        &self.0
989    }
990}
991impl ops::DerefMut for WatchFile {
992    fn deref_mut(&mut self) -> &mut Self::Target {
993        &mut self.0
994    }
995}
996
997const TRANSACTION_GUID: &str = "6eIw3bYMS0uKaQMkTIQacQ";
998const TRANSACTION_LOCK_EXT: &str = "6eIw3bYMS0uKaQMkTIQacQ-lock.tmp";
999
1000/// Represents an open write file provided by [`WATCHER.sync`].
1001///
1002/// This struct writes to a temporary file and renames it over the actual file on commit only.
1003/// The dereferenced [`fs::File`] is the temporary file, not the actual one.
1004///
1005/// # Transaction
1006///
1007/// To minimize the risk of file corruption exclusive locks are used, both the target file and the temp file
1008/// are locked. An empty lock file is also used to cover the moment when both files are unlocked for the rename operation
1009/// and the moment the temp file is acquired.
1010///
1011/// The temp file is the actual file path with file extension replaced with `{path/.file-name.ext}.{GUID}-{n}.tmp`, the `n` is a
1012/// number from 0 to 999, if a temp file exists unlocked it will be reused.
1013///
1014/// The lock file is `{path/.file-name.ext}.{GUID}-lock.tmp`. Note that this
1015/// lock file only helps for apps that use [`WriteFile`], but even without it the risk is minimal as the slow
1016/// write operations are already flushed when it is time to commit.
1017///
1018/// [`WATCHER.sync`]: WATCHER::sync
1019pub struct WriteFile {
1020    temp_file: Option<fs::File>,
1021    actual_file: Option<fs::File>,
1022    transaction_lock: Option<fs::File>,
1023
1024    actual_path: PathBuf,
1025    temp_path: PathBuf,
1026    transaction_path: PathBuf,
1027
1028    cleaned: bool,
1029}
1030impl Drop for WriteFile {
1031    fn drop(&mut self) {
1032        if !self.cleaned {
1033            tracing::error!("dropped sync write file without commit or cancel");
1034            self.clean();
1035        }
1036    }
1037}
1038impl ops::Deref for WriteFile {
1039    type Target = fs::File;
1040
1041    fn deref(&self) -> &Self::Target {
1042        self.temp_file.as_ref().unwrap()
1043    }
1044}
1045impl ops::DerefMut for WriteFile {
1046    fn deref_mut(&mut self) -> &mut Self::Target {
1047        self.temp_file.as_mut().unwrap()
1048    }
1049}
1050impl WriteFile {
1051    /// Open or create the file.
1052    pub fn open(path: PathBuf) -> io::Result<Self> {
1053        let actual_path = path.absolutize()?.into_owned();
1054        if !actual_path.exists()
1055            && let Some(parent) = actual_path.parent()
1056        {
1057            std::fs::create_dir_all(parent)?;
1058        }
1059
1060        let hidden_name = match actual_path.file_name() {
1061            Some(n) => format!(".{}", n.to_string_lossy()),
1062            None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "expected file name")),
1063        };
1064
1065        let transaction_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_LOCK_EXT}"));
1066        let transaction_lock = fs::OpenOptions::new()
1067            .create(true)
1068            .truncate(true)
1069            .write(true)
1070            .open(&transaction_path)?;
1071
1072        const TIMEOUT: Duration = Duration::from_secs(10);
1073
1074        lock_exclusive(&transaction_lock, TIMEOUT)?;
1075
1076        let actual_file = fs::OpenOptions::new().write(true).create(true).truncate(false).open(&actual_path)?;
1077        lock_exclusive(&actual_file, TIMEOUT)?;
1078
1079        let mut n = 0;
1080        let mut temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
1081        let temp_file = loop {
1082            if let Ok(f) = fs::OpenOptions::new().write(true).create(true).truncate(true).open(&temp_path)
1083                && f.try_lock().is_ok()
1084            {
1085                break f;
1086            }
1087
1088            n += 1;
1089            temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
1090            n += 1;
1091            if n > 1000 {
1092                return Err(io::Error::new(io::ErrorKind::AlreadyExists, "cannot create temporary file"));
1093            }
1094        };
1095
1096        Ok(Self {
1097            actual_file: Some(actual_file),
1098            temp_file: Some(temp_file),
1099            transaction_lock: Some(transaction_lock),
1100            actual_path,
1101            temp_path,
1102            transaction_path,
1103            cleaned: false,
1104        })
1105    }
1106
1107    /// Write the text string.
1108    pub fn write_text(&mut self, txt: &str) -> io::Result<()> {
1109        self.write_all(txt.as_bytes())
1110    }
1111
1112    /// Serialize and write.
1113    ///
1114    /// If `pretty` is `true` the JSON is formatted for human reading.
1115    #[cfg(feature = "json")]
1116    pub fn write_json<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
1117        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
1118        if pretty {
1119            serde_json::to_writer_pretty(&mut buf, value)?;
1120        } else {
1121            serde_json::to_writer(&mut buf, value)?;
1122        }
1123        buf.flush()
1124    }
1125
1126    /// Serialize and write.
1127    ///
1128    /// If `pretty` is `true` the TOML is formatted for human reading.
1129    #[cfg(feature = "toml")]
1130    pub fn write_toml<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
1131        let toml = if pretty {
1132            toml::ser::to_string_pretty(value)
1133        } else {
1134            toml::ser::to_string(value)
1135        }
1136        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1137
1138        self.write_all(toml.as_bytes())
1139    }
1140
1141    /// Serialize and write.
1142    ///
1143    /// If `pretty` is `true` the RON if formatted for human reading using the default pretty config.
1144    #[cfg(feature = "ron")]
1145    pub fn write_ron<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
1146        let buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
1147        struct Ffs<'a> {
1148            w: io::BufWriter<&'a mut fs::File>,
1149        }
1150        impl fmt::Write for Ffs<'_> {
1151            fn write_str(&mut self, s: &str) -> fmt::Result {
1152                self.w.write_all(s.as_bytes()).map_err(|_| fmt::Error)
1153            }
1154        }
1155        let mut buf = Ffs { w: buf };
1156        if pretty {
1157            ron::ser::to_writer_pretty(&mut buf, value, Default::default()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1158        } else {
1159            ron::ser::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1160        }
1161        buf.w.flush()
1162    }
1163
1164    /// Serialize and write.
1165    #[cfg(feature = "yaml")]
1166    pub fn write_yaml<O: serde::Serialize>(&mut self, value: &O) -> io::Result<()> {
1167        let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
1168        serde_yaml::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1169        buf.flush()
1170    }
1171
1172    /// Commit write, flush and replace the actual file with the new one.
1173    pub fn commit(mut self) -> io::Result<()> {
1174        let r = self.replace_actual();
1175        self.clean();
1176        r
1177    }
1178
1179    /// Cancel write, the file will not be updated.
1180    pub fn cancel(mut self) {
1181        self.clean();
1182    }
1183
1184    fn replace_actual(&mut self) -> io::Result<()> {
1185        let mut temp_file = self.temp_file.take().unwrap();
1186        temp_file.flush()?;
1187        temp_file.sync_all()?;
1188
1189        unlock_ok(&temp_file).unwrap();
1190        drop(temp_file);
1191
1192        let actual_file = self.actual_file.take().unwrap();
1193        unlock_ok(&actual_file)?;
1194        drop(actual_file);
1195
1196        let mut retries = 0;
1197        loop {
1198            // commit by replacing the actual_path with already on disk temp_path file.
1199            match fs::rename(&self.temp_path, &self.actual_path) {
1200                Ok(()) => {
1201                    break;
1202                }
1203                Err(e) => match e.kind() {
1204                    io::ErrorKind::PermissionDenied => {
1205                        if retries == 5 {
1206                            // Give-up, we managed to write lock both temp and actual just
1207                            // before this, but now we can't replace actual and remove temp.
1208                            // Hardware issue? Or another process holding a lock for 1s+50ms*5.
1209                            return Err(e);
1210                        } else if retries > 0 {
1211                            // Second+ retries:
1212                            //
1213                            // probably a system issue.
1214                            //
1215                            // Windows sporadically returns ACCESS_DENIED for kernel!SetRenameInformationFile in
1216                            // other apps that use the same save pattern (write-tmp -> close-tmp -> rename).
1217                            // see GIMP issue: https://gitlab.gnome.org/GNOME/gimp/-/issues/1370
1218                            //
1219                            // I used procmon to trace all file operations, there is no other app trying to use
1220                            // the temp and actual files when the ACCESS_DENIED occurs, both files are unlocked and
1221                            // closed before the rename calls start. This might be a Windows bug.
1222                            std::thread::sleep(30.ms());
1223                        } else {
1224                            // first retry:
1225                            //
1226                            // probably another process reading the `actual_path`.
1227                            //
1228                            // Reacquire a write lock and unlock, just to wait the external app.
1229                            match std::fs::File::options().write(true).open(&self.actual_path) {
1230                                Ok(f) => {
1231                                    if lock_exclusive(&f, 10.secs()).is_ok() {
1232                                        // acquired actual ok, retry
1233                                        let _ = unlock_ok(&f);
1234                                    }
1235                                }
1236                                Err(e) => match e.kind() {
1237                                    io::ErrorKind::NotFound => {
1238                                        // all good, rename will create actual
1239                                        continue;
1240                                    }
1241                                    _ => {
1242                                        // unknown error, let retry handle it
1243                                        std::thread::sleep(30.ms());
1244                                    }
1245                                },
1246                            }
1247                        }
1248
1249                        retries += 1;
1250                    }
1251                    _ => return Err(e),
1252                },
1253            }
1254        }
1255
1256        Ok(())
1257    }
1258
1259    fn clean(&mut self) {
1260        self.cleaned = true;
1261
1262        if let Some(tmp) = self.temp_file.take() {
1263            let _ = tmp.unlock();
1264        }
1265        if let Err(e) = fs::remove_file(&self.temp_path) {
1266            tracing::debug!("failed to cleanup temp file, {e}")
1267        }
1268
1269        if let Some(file) = self.actual_file.take() {
1270            let _ = file.unlock();
1271        }
1272
1273        let transaction = self.transaction_lock.take().unwrap();
1274        let _ = transaction.unlock();
1275        let _ = fs::remove_file(&self.transaction_path);
1276    }
1277}
1278
1279/// Error for [`WatchFile::parse`].
1280#[derive(Debug)]
1281#[non_exhaustive]
1282pub enum WatchFileParseError<E> {
1283    /// Error reading the file.
1284    Io(io::Error),
1285    /// Error parsing the file.
1286    Parse(E),
1287}
1288impl<E> From<io::Error> for WatchFileParseError<E> {
1289    fn from(value: io::Error) -> Self {
1290        Self::Io(value)
1291    }
1292}
1293impl<E: fmt::Display> fmt::Display for WatchFileParseError<E> {
1294    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1295        match self {
1296            WatchFileParseError::Io(e) => write!(f, "read error, {e}"),
1297            WatchFileParseError::Parse(e) => write!(f, "parse error, {e}"),
1298        }
1299    }
1300}
1301impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E> {
1302    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1303        match self {
1304            WatchFileParseError::Io(e) => Some(e),
1305            WatchFileParseError::Parse(e) => Some(e),
1306        }
1307    }
1308}
1309
1310/// Represents a [`FsChange`] note.
1311///
1312/// This trait is already implemented for all types it applies.
1313#[diagnostic::on_unimplemented(note = "`FsChangeNote` is implemented for all `T: Debug + Any + Send + Sync`")]
1314pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
1315    /// Access any.
1316    fn as_any(&self) -> &dyn std::any::Any;
1317}
1318impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
1319    fn as_any(&self) -> &dyn std::any::Any {
1320        self
1321    }
1322}
1323
1324/// Handle that holds a [`WATCHER.annotate`] note.
1325///
1326/// [`WATCHER.annotate`]: WATCHER::annotate
1327#[derive(Clone)]
1328#[must_use = "the note is removed when the handle is dropped"]
1329pub struct FsChangeNoteHandle(#[expect(dead_code)] Arc<Arc<dyn FsChangeNote>>);
1330
1331/// Annotation for file watcher events and var update tags.
1332///
1333/// Identifies the [`WATCHER.sync`] file that is currently being written to.
1334///
1335/// [`WATCHER.sync`]: WATCHER::sync
1336#[derive(Debug, PartialEq, Eq, Clone)]
1337pub struct WatcherSyncWriteNote(PathBuf);
1338impl WatcherSyncWriteNote {
1339    /// Deref.
1340    pub fn as_path(&self) -> &Path {
1341        self
1342    }
1343}
1344impl ops::Deref for WatcherSyncWriteNote {
1345    type Target = Path;
1346
1347    fn deref(&self) -> &Self::Target {
1348        self.0.as_path()
1349    }
1350}
1351
1352/// File system change event types.
1353///
1354/// The event for each change is available in [`FsChange::event`].
1355///
1356/// This module re-exports types from the [`notify`](https://docs.rs/notify) crate.
1357pub mod fs_event {
1358    pub use notify::event::{
1359        AccessKind, AccessMode, CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind, RenameMode,
1360    };
1361    pub use notify::{Error, ErrorKind};
1362}
1363
1364/// Represents a single file system change, annotated.
1365#[derive(Debug, Clone)]
1366#[non_exhaustive]
1367pub struct FsChange {
1368    /// All [`WATCHER.annotate`] that where set when this event happened.
1369    ///
1370    /// [`WATCHER.annotate`]: WATCHER::annotate
1371    pub notes: Vec<Arc<dyn FsChangeNote>>,
1372
1373    /// The actual notify event or error.
1374    pub event: Result<fs_event::Event, Arc<fs_event::Error>>,
1375}
1376impl PartialEq for FsChange {
1377    fn eq(&self, other: &Self) -> bool {
1378        self.notes.len() == other.notes.len()
1379            && self.notes.iter().zip(other.notes.iter()).all(|(a, b)| Arc::ptr_eq(a, b))
1380            && match (&self.event, &other.event) {
1381                (Ok(a), Ok(b)) => a == b,
1382                (Err(a), Err(b)) => Arc::ptr_eq(a, b),
1383                _ => false,
1384            }
1385    }
1386}
1387impl FsChange {
1388    /// If the change affects the `path`.
1389    pub fn is_for_path(&self, path: &Path) -> bool {
1390        if let Ok(ev) = &self.event {
1391            return ev.paths.iter().any(|p| p.starts_with(path));
1392        }
1393        false
1394    }
1395
1396    /// If the change affects any path matched by the glob pattern.
1397    pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
1398        if let Ok(ev) = &self.event {
1399            return ev.paths.iter().any(|p| pattern.matches_path(p));
1400        }
1401        false
1402    }
1403
1404    /// Iterate over all notes of the type `T`.
1405    pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
1406        self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
1407    }
1408}
1409
1410event_args! {
1411    /// [`FS_CHANGES_EVENT`] arguments.
1412    pub struct FsChangesArgs {
1413        /// All notify changes since the last event.
1414        pub changes: Arc<Vec<FsChange>>,
1415
1416        ..
1417
1418        /// None, only app level handlers receive this event.
1419        fn is_in_target(&self, _id: WidgetId) -> bool {
1420            false
1421        }
1422    }
1423}
1424impl FsChangesArgs {
1425    /// Iterate over all change events.
1426    pub fn events(&self) -> impl Iterator<Item = &fs_event::Event> + '_ {
1427        self.changes.iter().filter_map(|r| r.event.as_ref().ok())
1428    }
1429
1430    /// Iterate over all file watcher errors.
1431    pub fn errors(&self) -> impl Iterator<Item = &notify::Error> + '_ {
1432        self.changes.iter().filter_map(|r| r.event.as_ref().err().map(|e| &**e))
1433    }
1434
1435    /// Returns `true` is some events where lost.
1436    ///
1437    /// This indicates either a lapse in the events or a change in the filesystem such that events
1438    /// received so far can no longer be relied on to represent the state of the filesystem now.
1439    ///
1440    /// An application that simply reacts to file changes may not care about this. An application
1441    /// that keeps an in-memory representation of the filesystem will need to care, and will need
1442    /// to refresh that representation directly from the filesystem.
1443    pub fn rescan(&self) -> bool {
1444        self.events().any(|e| e.need_rescan())
1445    }
1446
1447    /// Iterate over all changes that affects paths selected by the `glob` pattern.
1448    pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
1449        let glob = glob::Pattern::new(glob)?;
1450        Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
1451    }
1452
1453    /// Iterate over all changes that affects paths that are equal to `path` or inside it.
1454    pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a FsChange> + 'a {
1455        self.changes.iter().filter(move |c| c.is_for_path(path))
1456    }
1457
1458    /// Iterate over all change events that affects that are equal to `path` or inside it.
1459    pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &fs_event::Event> + '_, glob::PatternError> {
1460        let glob = glob::Pattern::new(glob)?;
1461        Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
1462    }
1463
1464    /// Iterate over all change events that affects paths that are equal to `path` or inside it.
1465    pub fn events_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a fs_event::Event> + 'a {
1466        self.events().filter(move |ev| ev.paths.iter().any(|p| p.starts_with(path)))
1467    }
1468}
1469
1470zng_app::event::event! {
1471    /// Event sent by the [`WATCHER`] service on directories or files that are watched.
1472    pub static FS_CHANGES_EVENT: FsChangesArgs;
1473}
1474
1475/// Represents an active file or directory watcher in [`WATCHER`].
1476#[derive(Clone)]
1477#[must_use = "the watcher is dropped if the handle is dropped"]
1478pub struct WatcherHandle(Handle<()>);
1479
1480impl WatcherHandle {
1481    /// Handle to no watcher.
1482    pub fn dummy() -> Self {
1483        Self(Handle::dummy(()))
1484    }
1485
1486    /// If [`perm`](Self::perm) was called in another clone of this handle.
1487    ///
1488    /// If `true` the resource will stay in memory for the duration of the app, unless [`force_drop`](Self::force_drop)
1489    /// is also called.
1490    pub fn is_permanent(&self) -> bool {
1491        self.0.is_permanent()
1492    }
1493
1494    /// Force drops the watcher, meaning it will be dropped even if there are other handles active.
1495    pub fn force_drop(self) {
1496        self.0.force_drop()
1497    }
1498
1499    /// If the watcher is dropped.
1500    pub fn is_dropped(&self) -> bool {
1501        self.0.is_dropped()
1502    }
1503
1504    /// Drop the handle without dropping the watcher, the watcher will stay active for the
1505    /// duration of the app process.
1506    pub fn perm(self) {
1507        self.0.perm()
1508    }
1509}