zng_ext_fs_watcher/
service.rs

1use std::{
2    collections::{HashMap, hash_map},
3    io, mem,
4    path::{Path, PathBuf},
5    sync::{Arc, atomic::AtomicBool},
6    time::{Duration, SystemTime},
7};
8
9use atomic::{Atomic, Ordering};
10use notify::Watcher;
11use parking_lot::Mutex;
12use path_absolutize::Absolutize;
13use zng_app::{
14    DInstant, INSTANT, app_hn_once,
15    timer::{DeadlineHandle, TIMERS},
16};
17use zng_app_context::{LocalContext, app_local};
18use zng_clone_move::clmv;
19use zng_handle::{Handle, HandleOwner};
20use zng_unit::TimeUnits;
21use zng_var::{
22    AnyVar, AnyVarHookArgs, AnyWeakVar, ArcVar, ReadOnlyArcVar, VARS, Var, VarUpdateId, VarValue, WeakVar, types::WeakArcVar, var,
23};
24
25use crate::{
26    FS_CHANGES_EVENT, FsChange, FsChangeNote, FsChangeNoteHandle, FsChangesArgs, WATCHER, WatchFile, WatcherHandle, WatcherReadStatus,
27    WatcherSyncStatus, WatcherSyncWriteNote, WriteFile, fs_event,
28};
29
30use zng_task as task;
31
32#[cfg(target_has_atomic = "64")]
33use std::sync::atomic::AtomicU64;
34
35#[cfg(not(target_has_atomic = "64"))]
36struct AtomicU64(Mutex<u64>);
37#[cfg(not(target_has_atomic = "64"))]
38impl AtomicU64 {
39    pub const fn new(u: u64) -> Self {
40        Self(Mutex::new(u))
41    }
42
43    pub fn load(&self, _: Ordering) -> u64 {
44        *self.0.lock()
45    }
46
47    pub fn store(&self, _: Ordering, u: u64) {
48        *self.0.lock() = u;
49    }
50}
51
52app_local! {
53    pub(crate) static WATCHER_SV: WatcherService = WatcherService::new();
54}
55
56pub(crate) struct WatcherService {
57    pub debounce: ArcVar<Duration>,
58    pub sync_debounce: ArcVar<Duration>,
59    pub poll_interval: ArcVar<Duration>,
60    pub shutdown_timeout: ArcVar<Duration>,
61
62    watcher: Watchers,
63
64    debounce_oldest: DInstant,
65    debounce_buffer: Vec<FsChange>,
66    debounce_timer: Option<DeadlineHandle>,
67
68    read_to_var: Vec<ReadToVar>,
69    sync_with_var: Vec<SyncWithVar>,
70
71    notes: Vec<std::sync::Weak<Arc<dyn FsChangeNote>>>,
72}
73impl WatcherService {
74    fn new() -> Self {
75        Self {
76            debounce: var(100.ms()),
77            sync_debounce: var(100.ms()),
78            poll_interval: var(1.secs()),
79            shutdown_timeout: var(1.minutes()),
80            watcher: Watchers::new(),
81            debounce_oldest: INSTANT.now(),
82            debounce_buffer: vec![],
83            debounce_timer: None,
84            read_to_var: vec![],
85            sync_with_var: vec![],
86            notes: vec![],
87        }
88    }
89
90    pub fn init_watcher(&mut self) {
91        self.watcher.init();
92    }
93
94    pub fn event(&mut self, args: &FsChangesArgs) {
95        self.read_to_var.retain_mut(|f| f.on_event(args));
96        self.sync_with_var.retain_mut(|f| f.on_event(args));
97    }
98
99    pub fn low_memory(&mut self) {
100        self.read_to_var.retain_mut(|v| v.retain());
101        let sync_debounce = self.sync_debounce.get();
102        self.sync_with_var.retain_mut(|v| v.retain(sync_debounce));
103    }
104
105    pub fn update(&mut self) {
106        if let Some(n) = self.poll_interval.get_new() {
107            self.watcher.set_poll_interval(n);
108        }
109        if !self.debounce_buffer.is_empty() {
110            if let Some(n) = self.debounce.get_new() {
111                if self.debounce_oldest.elapsed() >= n {
112                    self.notify();
113                }
114            }
115        }
116        self.read_to_var.retain_mut(|f| f.retain());
117        let sync_debounce = self.sync_debounce.get();
118        self.sync_with_var.retain_mut(|f| f.retain(sync_debounce));
119    }
120
121    pub fn watch(&mut self, file: PathBuf) -> WatcherHandle {
122        self.watcher.watch(file)
123    }
124
125    pub fn watch_dir(&mut self, dir: PathBuf, recursive: bool) -> WatcherHandle {
126        self.watcher.watch_dir(dir, recursive)
127    }
128
129    pub fn read<O: VarValue>(
130        &mut self,
131        file: PathBuf,
132        init: O,
133        read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
134    ) -> ReadOnlyArcVar<O> {
135        let handle = self.watch(file.clone());
136        fn open(p: &Path) -> io::Result<WatchFile> {
137            WatchFile::open(p)
138        }
139        let (read, var) = ReadToVar::new(handle, file, init, open, read, || {});
140        self.read_to_var.push(read);
141        var
142    }
143
144    pub fn read_status<O, S, E>(
145        &mut self,
146        file: PathBuf,
147        init: O,
148        mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
149    ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
150    where
151        O: VarValue,
152        S: WatcherReadStatus<E>,
153    {
154        let handle = self.watch(file.clone());
155        fn open(p: &Path) -> io::Result<WatchFile> {
156            WatchFile::open(p)
157        }
158        let status = var(S::reading());
159
160        let (read, var) = ReadToVar::new(
161            handle,
162            file,
163            init,
164            open,
165            // read
166            clmv!(status, |d| {
167                status.set(S::reading());
168                match read(d) {
169                    Ok(r) => {
170                        if r.is_none() {
171                            status.set(S::idle());
172                        }
173                        r
174                    }
175                    Err(e) => {
176                        status.set(S::read_error(e));
177                        None
178                    }
179                }
180            }),
181            // on_modify
182            clmv!(status, || {
183                status.set(S::idle());
184            }),
185        );
186        self.read_to_var.push(read);
187
188        (var, status.read_only())
189    }
190
191    pub fn read_dir<O: VarValue>(
192        &mut self,
193        dir: PathBuf,
194        recursive: bool,
195        init: O,
196        read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
197    ) -> ReadOnlyArcVar<O> {
198        let handle = self.watch_dir(dir.clone(), recursive);
199        fn open(p: &Path) -> walkdir::WalkDir {
200            walkdir::WalkDir::new(p).min_depth(1).max_depth(1)
201        }
202        fn open_recursive(p: &Path) -> walkdir::WalkDir {
203            walkdir::WalkDir::new(p).min_depth(1)
204        }
205        let (read, var) = ReadToVar::new(handle, dir, init, if recursive { open_recursive } else { open }, read, || {});
206        self.read_to_var.push(read);
207        var
208    }
209    pub fn read_dir_status<O, S, E>(
210        &mut self,
211        dir: PathBuf,
212        recursive: bool,
213        init: O,
214        mut read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
215    ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
216    where
217        O: VarValue,
218        S: WatcherReadStatus<E>,
219    {
220        let status = var(S::reading());
221
222        let handle = self.watch_dir(dir.clone(), recursive);
223        fn open(p: &Path) -> walkdir::WalkDir {
224            walkdir::WalkDir::new(p).min_depth(1).max_depth(1)
225        }
226        fn open_recursive(p: &Path) -> walkdir::WalkDir {
227            walkdir::WalkDir::new(p).min_depth(1)
228        }
229
230        let (read, var) = ReadToVar::new(
231            handle,
232            dir,
233            init,
234            if recursive { open_recursive } else { open },
235            // read
236            clmv!(status, |d| {
237                status.set(S::reading());
238                match read(d) {
239                    Ok(r) => {
240                        if r.is_none() {
241                            status.set(S::idle());
242                        }
243                        r
244                    }
245                    Err(e) => {
246                        status.set(S::read_error(e));
247                        None
248                    }
249                }
250            }),
251            // on_modify
252            clmv!(status, || {
253                status.set(S::idle());
254            }),
255        );
256        self.read_to_var.push(read);
257
258        (var, status.read_only())
259    }
260
261    pub fn sync<O: VarValue>(
262        &mut self,
263        file: PathBuf,
264        init: O,
265        read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
266        mut write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
267    ) -> ArcVar<O> {
268        let handle = self.watch(file.clone());
269
270        let (sync, var) = SyncWithVar::new(handle, file, init, read, move |o, _, f| write(o, f), |_| {});
271        self.sync_with_var.push(sync);
272        var
273    }
274
275    pub fn sync_status<O, S, ER, EW>(
276        &mut self,
277        file: PathBuf,
278        init: O,
279        mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
280        mut write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
281    ) -> (ArcVar<O>, ReadOnlyArcVar<S>)
282    where
283        O: VarValue,
284        S: WatcherSyncStatus<ER, EW>,
285    {
286        let handle = self.watch(file.clone());
287        let latest_write = Arc::new(Atomic::new(VarUpdateId::never()));
288
289        let status = var(S::reading());
290        let (sync, var) = SyncWithVar::new(
291            handle,
292            file,
293            init,
294            // read
295            clmv!(status, |f| {
296                status.set(S::reading());
297                match read(f) {
298                    Ok(r) => {
299                        if r.is_none() {
300                            status.set(S::idle());
301                        }
302                        r
303                    }
304                    Err(e) => {
305                        status.set(S::read_error(e));
306                        None
307                    }
308                }
309            }),
310            // write
311            clmv!(status, latest_write, |o, o_id, f| {
312                status.set(S::writing()); // init write
313                match write(o, f) {
314                    Ok(()) => {
315                        if latest_write.load(Ordering::Relaxed) == o_id {
316                            status.set(S::idle());
317                        }
318                    }
319                    Err(e) => {
320                        status.set(S::write_error(e));
321                    }
322                }
323            }),
324            // hook&modify
325            clmv!(status, |is_read| {
326                status.set(if is_read {
327                    S::idle()
328                } else {
329                    let id = VARS.update_id();
330                    latest_write.store(id, Ordering::Relaxed);
331
332                    S::writing()
333                });
334            }),
335        );
336
337        self.sync_with_var.push(sync);
338
339        (var, status.read_only())
340    }
341
342    fn on_watcher(&mut self, r: Result<fs_event::Event, fs_event::Error>) {
343        if let Ok(r) = &r {
344            if !self.watcher.allow(r) {
345                // file parent watcher, file not affected.
346                return;
347            }
348        }
349
350        let notify = self.debounce_oldest.elapsed() >= self.debounce.get();
351
352        let mut notes = Vec::with_capacity(self.notes.len());
353        self.notes.retain(|n| match n.upgrade() {
354            Some(n) => {
355                notes.push(Arc::clone(&*n));
356                true
357            }
358            None => false,
359        });
360
361        self.debounce_buffer.push(FsChange { notes, event: r });
362
363        if notify {
364            self.notify();
365        } else if self.debounce_timer.is_none() {
366            self.debounce_timer = Some(TIMERS.on_deadline(
367                self.debounce.get(),
368                app_hn_once!(|_| {
369                    WATCHER_SV.write().on_debounce_timer();
370                }),
371            ));
372        }
373    }
374
375    pub fn annotate(&mut self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
376        let handle = Arc::new(note);
377        self.notes.push(Arc::downgrade(&handle));
378        FsChangeNoteHandle(handle)
379    }
380
381    fn on_debounce_timer(&mut self) {
382        if !self.debounce_buffer.is_empty() {
383            self.notify();
384        }
385    }
386
387    fn notify(&mut self) {
388        let changes = mem::take(&mut self.debounce_buffer);
389        let now = INSTANT.now();
390        self.debounce_oldest = now;
391        self.debounce_timer = None;
392
393        FS_CHANGES_EVENT.notify(FsChangesArgs::new(now, Default::default(), changes));
394    }
395
396    /// Deinit watcher, returns items to flush without a service lock.
397    pub(crate) fn shutdown(&mut self) -> Vec<SyncWithVar> {
398        self.watcher.deinit();
399        mem::take(&mut self.sync_with_var)
400    }
401}
402fn notify_watcher_handler() -> impl notify::EventHandler {
403    let mut ctx = LocalContext::capture();
404    move |r| ctx.with_context(|| WATCHER_SV.write().on_watcher(r))
405}
406
407struct ReadToVar {
408    read: Box<dyn Fn(&Arc<AtomicBool>, &WatcherHandle, ReadEvent) + Send + Sync>,
409    pending: Arc<AtomicBool>,
410    handle: WatcherHandle,
411}
412impl ReadToVar {
413    fn new<O: VarValue, R: 'static>(
414        handle: WatcherHandle,
415        mut path: PathBuf,
416        init: O,
417        load: fn(&Path) -> R,
418        read: impl FnMut(R) -> Option<O> + Send + 'static,
419        on_modify: impl Fn() + Send + Sync + 'static,
420    ) -> (Self, ReadOnlyArcVar<O>) {
421        if let Ok(p) = path.absolutize() {
422            path = p.into_owned();
423        }
424        let path = Arc::new(path);
425        let var = var(init);
426        let on_modify = Arc::new(on_modify);
427
428        let pending = Arc::new(AtomicBool::new(false));
429        let read = Arc::new(Mutex::new(read));
430        let wk_var = var.downgrade();
431
432        // read task "drains" pending, drops handle if the var is dropped.
433        let read = Box::new(move |pending: &Arc<AtomicBool>, handle: &WatcherHandle, ev: ReadEvent| {
434            if wk_var.strong_count() == 0 {
435                handle.clone().force_drop();
436                return;
437            }
438
439            let spawn = match ev {
440                ReadEvent::Update => false,
441                ReadEvent::Event(args) => !pending.load(Ordering::Relaxed) && args.events_for_path(&path).next().is_some(),
442                ReadEvent::Init => true,
443            };
444
445            if !spawn {
446                return;
447            }
448
449            pending.store(true, Ordering::Relaxed);
450            if read.try_lock().is_none() {
451                // another task already running.
452                return;
453            }
454            task::spawn_wait(clmv!(read, wk_var, path, handle, pending, on_modify, || {
455                let mut read = read.lock();
456                while pending.swap(false, Ordering::Relaxed) {
457                    if let Some(update) = read(load(path.as_path())) {
458                        if let Some(var) = wk_var.upgrade() {
459                            var.modify(clmv!(on_modify, |vm| {
460                                vm.set(update);
461                                on_modify();
462                            }));
463                        } else {
464                            // var dropped
465                            handle.force_drop();
466                            break;
467                        }
468                    }
469                }
470            }));
471        });
472        read(&pending, &handle, ReadEvent::Init);
473
474        (Self { read, pending, handle }, var.read_only())
475    }
476
477    /// Match the event and flag variable update.
478    ///
479    /// Returns if the variable is still alive.
480    pub fn on_event(&mut self, args: &FsChangesArgs) -> bool {
481        if !self.handle.is_dropped() {
482            (self.read)(&self.pending, &self.handle, ReadEvent::Event(args));
483        }
484        !self.handle.is_dropped()
485    }
486
487    /// Returns if the variable is still alive.
488    fn retain(&mut self) -> bool {
489        if !self.handle.is_dropped() {
490            (self.read)(&self.pending, &self.handle, ReadEvent::Update);
491        }
492        !self.handle.is_dropped()
493    }
494}
495enum ReadEvent<'a> {
496    Update,
497    Event(&'a FsChangesArgs),
498    Init,
499}
500
501pub(crate) struct SyncWithVar {
502    task: Box<dyn Fn(&WatcherHandle, SyncEvent) + Send + Sync>,
503    handle: WatcherHandle,
504}
505impl SyncWithVar {
506    fn new<O, R, W, U>(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W, var_hook_and_modify: U) -> (Self, ArcVar<O>)
507    where
508        O: VarValue,
509        R: FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
510        W: FnMut(O, VarUpdateId, io::Result<WriteFile>) + Send + 'static,
511        U: Fn(bool) + Send + Sync + 'static,
512    {
513        if let Ok(p) = file.absolutize() {
514            file = p.into_owned();
515        }
516
517        let path = Arc::new(WatcherSyncWriteNote(file));
518        let latest_from_read = Arc::new(AtomicBool::new(false));
519
520        let var_hook_and_modify = Arc::new(var_hook_and_modify);
521
522        let var = var(init);
523        var.hook_any(Box::new(clmv!(
524            path,
525            latest_from_read,
526            var_hook_and_modify,
527            |args: &AnyVarHookArgs| {
528                let is_read = args.downcast_tags::<Arc<WatcherSyncWriteNote>>().any(|n| n == &path);
529                latest_from_read.store(is_read, Ordering::Relaxed);
530                var_hook_and_modify(is_read);
531                true
532            }
533        )))
534        .perm();
535
536        type PendingFlag = u8;
537        const READ: PendingFlag = 0b01;
538        const WRITE: PendingFlag = 0b11;
539
540        struct TaskData<R, W, O: VarValue> {
541            pending: Atomic<PendingFlag>,
542            read_write: Mutex<(R, W)>,
543            wk_var: WeakArcVar<O>,
544            last_write: AtomicU64, // ms from epoch
545        }
546        let task_data = Arc::new(TaskData {
547            pending: Atomic::new(0),
548            read_write: Mutex::new((read, write)),
549            wk_var: var.downgrade(),
550            last_write: AtomicU64::new(0),
551        });
552
553        // task drains pending, drops handle if the var is dropped.
554        let task = Box::new(move |handle: &WatcherHandle, ev: SyncEvent| {
555            let var = match task_data.wk_var.upgrade() {
556                Some(v) => v,
557                None => {
558                    handle.clone().force_drop();
559                    return;
560                }
561            };
562
563            let mut debounce = None;
564
565            let mut pending = 0;
566
567            match ev {
568                SyncEvent::Update(sync_debounce) => {
569                    if var.is_new() && !latest_from_read.load(Ordering::Relaxed) {
570                        debounce = Some(sync_debounce);
571                        pending |= WRITE;
572                    } else {
573                        return;
574                    }
575                }
576                SyncEvent::Event(args) => {
577                    if args.rescan() {
578                        pending |= READ;
579                    } else {
580                        'ev: for ev in args.changes_for_path(&path) {
581                            for note in ev.notes::<WatcherSyncWriteNote>() {
582                                if path.as_path() == note.as_path() {
583                                    // we caused this event
584                                    continue 'ev;
585                                }
586                            }
587
588                            pending |= READ;
589                            break;
590                        }
591                        if pending == 0 {
592                            return;
593                        }
594                    }
595                }
596                SyncEvent::Init => {
597                    if path.exists() {
598                        pending |= READ;
599                    } else {
600                        pending |= WRITE;
601                    }
602                }
603                SyncEvent::FlushShutdown => {
604                    let timeout = WATCHER_SV.read().shutdown_timeout.get();
605                    if task_data.read_write.try_lock_for(timeout).is_none() {
606                        tracing::error!("not all io operations finished on shutdown, timeout after {timeout:?}");
607                    }
608                    return;
609                }
610            };
611            drop(var);
612
613            task_data.pending.fetch_or(pending, Ordering::Relaxed);
614
615            if task_data.read_write.try_lock().is_none() {
616                // another spawn is already applying
617                return;
618            }
619            task::spawn_wait(clmv!(task_data, path, var_hook_and_modify, handle, || {
620                let mut read_write = task_data.read_write.lock();
621                let (read, write) = &mut *read_write;
622
623                loop {
624                    let pending = task_data.pending.swap(0, Ordering::Relaxed);
625
626                    if pending == WRITE {
627                        if let Some(d) = debounce {
628                            let now_ms = SystemTime::now()
629                                .duration_since(SystemTime::UNIX_EPOCH)
630                                .unwrap_or_default()
631                                .as_millis() as u64;
632                            let prev_ms = task_data.last_write.load(Ordering::Relaxed);
633                            let elapsed = Duration::from_millis(now_ms - prev_ms);
634                            if elapsed < d {
635                                std::thread::sleep(d - elapsed);
636                            }
637                            task_data.last_write.store(now_ms, Ordering::Relaxed);
638                        }
639
640                        let (id, value) = if let Some(var) = task_data.wk_var.upgrade() {
641                            (var.last_update(), var.get())
642                        } else {
643                            handle.force_drop();
644                            return;
645                        };
646
647                        {
648                            let _note = WATCHER.annotate(path.clone());
649                            write(value, id, WriteFile::open(path.to_path_buf()));
650                        }
651
652                        if task_data.wk_var.strong_count() == 0 {
653                            handle.force_drop();
654                            return;
655                        }
656                    } else if pending == READ {
657                        if task_data.wk_var.strong_count() == 0 {
658                            handle.force_drop();
659                            return;
660                        }
661
662                        if let Some(update) = read(WatchFile::open(path.as_path())) {
663                            if let Some(var) = task_data.wk_var.upgrade() {
664                                var.modify(clmv!(path, var_hook_and_modify, |vm| {
665                                    vm.set(update);
666                                    vm.push_tag(path);
667                                    var_hook_and_modify(true);
668                                }));
669                            } else {
670                                handle.force_drop();
671                                return;
672                            }
673                        }
674                    } else {
675                        return;
676                    }
677                }
678            }));
679        });
680
681        task(&handle, SyncEvent::Init);
682
683        (Self { task, handle }, var)
684    }
685
686    /// Match the event and flag variable update.
687    ///
688    /// Returns if the variable is still alive.
689    pub fn on_event(&mut self, args: &FsChangesArgs) -> bool {
690        if !self.handle.is_dropped() {
691            (self.task)(&self.handle, SyncEvent::Event(args));
692        }
693        !self.handle.is_dropped()
694    }
695
696    /// Returns if the variable is still alive.
697    fn retain(&mut self, sync_debounce: Duration) -> bool {
698        if !self.handle.is_dropped() {
699            (self.task)(&self.handle, SyncEvent::Update(sync_debounce));
700        }
701        !self.handle.is_dropped()
702    }
703
704    pub fn flush_shutdown(&mut self) {
705        if !self.handle.is_dropped() {
706            (self.task)(&self.handle, SyncEvent::FlushShutdown);
707        }
708    }
709}
710enum SyncEvent<'a> {
711    Update(Duration),
712    Event(&'a FsChangesArgs),
713    Init,
714    FlushShutdown,
715}
716
717struct Watchers {
718    dirs: HashMap<PathBuf, DirWatcher>,
719    watcher: Mutex<Box<dyn notify::Watcher + Send>>, // mutex for Sync only
720    // watcher for paths that the system watcher cannot watch yet.
721    error_watcher: Option<PollWatcher>,
722    poll_interval: Duration,
723}
724impl Watchers {
725    fn new() -> Self {
726        Self {
727            dirs: HashMap::default(),
728            watcher: Mutex::new(Box::new(notify::NullWatcher)),
729            error_watcher: None,
730            poll_interval: 1.secs(),
731        }
732    }
733
734    fn watch(&mut self, file: PathBuf) -> WatcherHandle {
735        self.watch_insert(file, WatchMode::File(std::ffi::OsString::new()))
736    }
737
738    fn watch_dir(&mut self, dir: PathBuf, recursive: bool) -> WatcherHandle {
739        self.watch_insert(dir, if recursive { WatchMode::Descendants } else { WatchMode::Children })
740    }
741
742    /// path can still contain the file name if mode is `WatchMode::File("")`
743    fn watch_insert(&mut self, mut path: PathBuf, mut mode: WatchMode) -> WatcherHandle {
744        use path_absolutize::*;
745        path = match path.absolutize() {
746            Ok(p) => p.to_path_buf(),
747            Err(e) => {
748                tracing::error!("cannot watch `{}`, failed to absolutize `{}`", path.display(), e);
749                return WatcherHandle::dummy();
750            }
751        };
752
753        if let WatchMode::File(name) = &mut mode {
754            if let Some(n) = path.file_name() {
755                *name = n.to_os_string();
756                path.pop();
757            } else {
758                tracing::error!("cannot watch file `{}`", path.display());
759                return WatcherHandle::dummy();
760            }
761        }
762
763        let w = self.dirs.entry(path.clone()).or_default();
764
765        for (m, handle) in &w.modes {
766            if m == &mode {
767                if let Some(h) = handle.weak_handle().upgrade() {
768                    return WatcherHandle(h);
769                }
770            }
771        }
772
773        let (owner, handle) = Handle::new(());
774
775        let recursive = matches!(&mode, WatchMode::Descendants);
776
777        if w.modes.is_empty() {
778            if Self::inner_watch_dir(&mut **self.watcher.get_mut(), &path, recursive).is_err() {
779                Self::inner_watch_error_dir(&mut self.error_watcher, &path, recursive, self.poll_interval);
780                w.is_in_error_watcher = true;
781            }
782        } else {
783            let was_recursive = w.recursive();
784            if !was_recursive && recursive {
785                let watcher = &mut **self.watcher.get_mut();
786
787                if mem::take(&mut w.is_in_error_watcher) {
788                    Self::inner_unwatch_dir(self.error_watcher.as_mut().unwrap(), &path);
789                } else {
790                    Self::inner_unwatch_dir(watcher, &path);
791                }
792                if Self::inner_watch_dir(watcher, &path, recursive).is_err() {
793                    Self::inner_watch_error_dir(&mut self.error_watcher, &path, recursive, self.poll_interval);
794                }
795            }
796        }
797
798        w.modes.push((mode, owner));
799
800        WatcherHandle(handle)
801    }
802
803    fn cleanup(&mut self) {
804        let watcher = &mut **self.watcher.get_mut();
805        self.dirs.retain(|k, v| {
806            let r = v.retain();
807            if !r {
808                if v.is_in_error_watcher {
809                    Self::inner_unwatch_dir(self.error_watcher.as_mut().unwrap(), k);
810                } else {
811                    Self::inner_unwatch_dir(watcher, k);
812                }
813            }
814            r
815        })
816    }
817
818    fn set_poll_interval(&mut self, interval: Duration) {
819        self.poll_interval = interval;
820        if let Err(e) = self
821            .watcher
822            .get_mut()
823            .configure(notify::Config::default().with_poll_interval(interval))
824        {
825            tracing::error!("error setting the watcher poll interval: {e}");
826        }
827        if let Some(w) = &mut self.error_watcher {
828            w.configure(notify::Config::default().with_poll_interval(interval)).unwrap();
829        }
830    }
831
832    fn init(&mut self) {
833        *self.watcher.get_mut() = match notify::recommended_watcher(notify_watcher_handler()) {
834            Ok(w) => Box::new(w),
835            Err(e) => {
836                tracing::error!("error creating watcher\n{e}\nfallback to slow poll watcher");
837                match PollWatcher::new(
838                    notify_watcher_handler(),
839                    notify::Config::default().with_poll_interval(self.poll_interval),
840                ) {
841                    Ok(w) => Box::new(w),
842                    Err(e) => {
843                        tracing::error!("error creating poll watcher\n{e}\nfs watching disabled");
844                        Box::new(notify::NullWatcher)
845                    }
846                }
847            }
848        };
849
850        self.cleanup();
851
852        let watcher = &mut **self.watcher.get_mut();
853        for (dir, w) in &mut self.dirs {
854            let recursive = w.recursive();
855            if Self::inner_watch_dir(watcher, dir.as_path(), recursive).is_err() {
856                Self::inner_watch_error_dir(&mut self.error_watcher, dir, recursive, self.poll_interval);
857                w.is_in_error_watcher = true;
858            }
859        }
860    }
861
862    fn deinit(&mut self) {
863        *self.watcher.get_mut() = Box::new(notify::NullWatcher);
864    }
865
866    /// Returns Ok, or Err `PathNotFound` or `MaxFilesWatch` that can be handled using the fallback watcher.
867    fn inner_watch_dir(watcher: &mut dyn notify::Watcher, dir: &Path, recursive: bool) -> Result<(), notify::ErrorKind> {
868        let recursive = if recursive {
869            notify::RecursiveMode::Recursive
870        } else {
871            notify::RecursiveMode::NonRecursive
872        };
873        if let Err(e) = watcher.watch(dir, recursive) {
874            match e.kind {
875                notify::ErrorKind::Generic(e) => {
876                    if dir.try_exists().unwrap_or(true) {
877                        tracing::error!("cannot watch dir `{}`, {e}", dir.display())
878                    } else {
879                        return Err(notify::ErrorKind::PathNotFound);
880                    }
881                }
882                notify::ErrorKind::Io(e) => {
883                    if let io::ErrorKind::NotFound = e.kind() {
884                        return Err(notify::ErrorKind::PathNotFound);
885                    } else if dir.try_exists().unwrap_or(true) {
886                        tracing::error!("cannot watch dir `{}`, {e}", dir.display())
887                    } else {
888                        return Err(notify::ErrorKind::PathNotFound);
889                    }
890                }
891                e @ notify::ErrorKind::PathNotFound | e @ notify::ErrorKind::MaxFilesWatch => return Err(e),
892                notify::ErrorKind::InvalidConfig(e) => unreachable!("{e:?}"),
893                notify::ErrorKind::WatchNotFound => unreachable!(),
894            }
895        }
896        Ok(())
897    }
898
899    fn inner_watch_error_dir(watcher: &mut Option<PollWatcher>, dir: &Path, recursive: bool, poll_interval: Duration) {
900        let watcher = watcher.get_or_insert_with(|| {
901            PollWatcher::new(
902                notify_watcher_handler(),
903                notify::Config::default().with_poll_interval(poll_interval),
904            )
905            .unwrap()
906        });
907        Self::inner_watch_dir(watcher, dir, recursive).unwrap();
908    }
909
910    fn inner_unwatch_dir(watcher: &mut dyn notify::Watcher, dir: &Path) {
911        if let Err(e) = watcher.unwatch(dir) {
912            match e.kind {
913                notify::ErrorKind::Generic(e) => {
914                    tracing::error!("cannot unwatch dir `{}`, {e}", dir.display());
915                }
916                notify::ErrorKind::Io(e) => {
917                    tracing::error!("cannot unwatch dir `{}`, {e}", dir.display());
918                }
919                notify::ErrorKind::PathNotFound => {}  // ok?
920                notify::ErrorKind::WatchNotFound => {} // ok
921                notify::ErrorKind::InvalidConfig(_) => unreachable!(),
922                notify::ErrorKind::MaxFilesWatch => unreachable!(),
923            }
924        }
925    }
926
927    fn allow(&mut self, r: &fs_event::Event) -> bool {
928        if let notify::EventKind::Access(_) = r.kind {
929            if !r.need_rescan() {
930                return false;
931            }
932        }
933
934        for (dir, w) in &mut self.dirs {
935            let mut matched = false;
936
937            'modes: for (mode, _) in &w.modes {
938                match mode {
939                    WatchMode::File(f) => {
940                        for path in &r.paths {
941                            if let Some(name) = path.file_name() {
942                                if name == f {
943                                    if let Some(path) = path.parent() {
944                                        if path == dir {
945                                            // matched `dir/exact`
946                                            matched = true;
947                                            break 'modes;
948                                        }
949                                    }
950                                }
951                            }
952                        }
953                    }
954                    WatchMode::Children => {
955                        for path in &r.paths {
956                            if let Some(path) = path.parent() {
957                                if path == dir {
958                                    // matched `dir/*`
959                                    matched = true;
960                                    break 'modes;
961                                }
962                            }
963                        }
964                    }
965                    WatchMode::Descendants => {
966                        for path in &r.paths {
967                            if path.starts_with(dir) {
968                                // matched `dir/**`
969                                matched = true;
970                                break 'modes;
971                            }
972                        }
973                    }
974                }
975            }
976
977            if matched {
978                if mem::take(&mut w.is_in_error_watcher) {
979                    // poll watcher managed to reach the path without error, try to move to the
980                    // more performant system watcher.
981                    Self::inner_unwatch_dir(self.error_watcher.as_mut().unwrap(), dir);
982                    let recursive = w.recursive();
983                    if Self::inner_watch_dir(&mut **self.watcher.get_mut(), dir, recursive).is_err() {
984                        // failed again
985                        Self::inner_watch_error_dir(&mut self.error_watcher, dir, recursive, self.poll_interval);
986                        w.is_in_error_watcher = true;
987                    }
988                }
989                return true;
990            }
991        }
992        false
993    }
994}
995
996#[derive(PartialEq, Eq)]
997enum WatchMode {
998    File(std::ffi::OsString),
999    Children,
1000    Descendants,
1001}
1002
1003#[derive(Default)]
1004struct DirWatcher {
1005    is_in_error_watcher: bool,
1006    modes: Vec<(WatchMode, HandleOwner<()>)>,
1007}
1008impl DirWatcher {
1009    fn recursive(&self) -> bool {
1010        self.modes.iter().any(|m| matches!(&m.0, WatchMode::Descendants))
1011    }
1012
1013    fn retain(&mut self) -> bool {
1014        self.modes.retain(|(_, h)| !h.is_dropped());
1015        !self.modes.is_empty()
1016    }
1017}
1018
1019enum PollMsg {
1020    Watch(PathBuf, bool),
1021    Unwatch(PathBuf),
1022    SetConfig(notify::Config),
1023}
1024
1025/// Polling watcher.
1026///
1027/// We don't use the `notify` poll watcher to ignore path not found.
1028struct PollWatcher {
1029    sender: flume::Sender<PollMsg>,
1030    worker: Option<std::thread::JoinHandle<()>>,
1031}
1032
1033impl PollWatcher {
1034    fn send_msg(&mut self, msg: PollMsg) {
1035        if self.sender.send(msg).is_err() {
1036            if let Some(worker) = self.worker.take() {
1037                if let Err(panic) = worker.join() {
1038                    std::panic::resume_unwind(panic);
1039                }
1040            }
1041        }
1042    }
1043}
1044impl notify::Watcher for PollWatcher {
1045    fn new<F: notify::EventHandler>(mut event_handler: F, mut config: notify::Config) -> notify::Result<Self>
1046    where
1047        Self: Sized,
1048    {
1049        let (sender, rcv) = flume::unbounded();
1050        let mut dirs = HashMap::<PathBuf, PollInfo, _>::new();
1051        let worker = std::thread::Builder::new()
1052            .name(String::from("poll-watcher"))
1053            .spawn(move || {
1054                loop {
1055                    match rcv.recv_timeout(config.poll_interval_v2().unwrap_or_default()) {
1056                        Ok(msg) => match msg {
1057                            PollMsg::Watch(d, r) => {
1058                                let info = PollInfo::new(&d, r);
1059                                dirs.insert(d, info);
1060                            }
1061                            PollMsg::Unwatch(d) => {
1062                                if dirs.remove(&d).is_none() {
1063                                    event_handler.handle_event(Err(notify::Error {
1064                                        kind: notify::ErrorKind::WatchNotFound,
1065                                        paths: vec![d],
1066                                    }))
1067                                }
1068                            }
1069                            PollMsg::SetConfig(c) => config = c,
1070                        },
1071                        Err(e) => match e {
1072                            flume::RecvTimeoutError::Timeout => {}           // ok
1073                            flume::RecvTimeoutError::Disconnected => return, // stop thread
1074                        },
1075                    }
1076
1077                    for (dir, info) in &mut dirs {
1078                        info.poll(dir, &mut event_handler);
1079                    }
1080                }
1081            })
1082            .expect("failed to spawn poll-watcher thread");
1083
1084        Ok(Self {
1085            sender,
1086            worker: Some(worker),
1087        })
1088    }
1089
1090    fn watch(&mut self, path: &Path, recursive_mode: notify::RecursiveMode) -> notify::Result<()> {
1091        let msg = PollMsg::Watch(path.to_path_buf(), matches!(recursive_mode, notify::RecursiveMode::Recursive));
1092        self.send_msg(msg);
1093        Ok(())
1094    }
1095
1096    fn unwatch(&mut self, path: &Path) -> notify::Result<()> {
1097        let msg = PollMsg::Unwatch(path.to_path_buf());
1098        self.send_msg(msg);
1099        Ok(())
1100    }
1101
1102    fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
1103        let msg = PollMsg::SetConfig(option);
1104        self.send_msg(msg);
1105        Ok(true)
1106    }
1107
1108    fn kind() -> notify::WatcherKind
1109    where
1110        Self: Sized,
1111    {
1112        notify::WatcherKind::PollWatcher
1113    }
1114}
1115#[derive(Default)]
1116struct PollInfo {
1117    recursive: bool,
1118    paths: HashMap<PathBuf, PollEntry>,
1119    /// entries with `update_flag` not-eq this are removed.
1120    update_flag: bool,
1121}
1122struct PollEntry {
1123    modified: std::time::SystemTime,
1124    /// flipped by `recursive_update` if visited.
1125    update_flag: bool,
1126}
1127impl PollInfo {
1128    fn new(path: &Path, recursive: bool) -> Self {
1129        let mut paths = HashMap::new();
1130
1131        for entry in walkdir::WalkDir::new(path)
1132            .min_depth(1)
1133            .max_depth(if recursive { usize::MAX } else { 1 })
1134            .into_iter()
1135            .flatten()
1136        {
1137            if let Some(modified) = entry.metadata().ok().and_then(|m| m.modified().ok()) {
1138                paths.insert(
1139                    entry.into_path(),
1140                    PollEntry {
1141                        modified,
1142                        update_flag: false,
1143                    },
1144                );
1145            }
1146        }
1147
1148        Self {
1149            recursive,
1150            paths,
1151            update_flag: false,
1152        }
1153    }
1154
1155    fn poll(&mut self, root: &Path, handler: &mut impl notify::EventHandler) {
1156        self.update_flag = !self.update_flag;
1157        for entry in walkdir::WalkDir::new(root)
1158            .min_depth(1)
1159            .max_depth(if self.recursive { usize::MAX } else { 1 })
1160            .into_iter()
1161            .flatten()
1162        {
1163            if let Some((is_dir, modified)) = entry.metadata().ok().and_then(|m| Some((m.is_dir(), m.modified().ok()?))) {
1164                match self.paths.entry(entry.into_path()) {
1165                    hash_map::Entry::Occupied(mut e) => {
1166                        let info = e.get_mut();
1167                        info.update_flag = self.update_flag;
1168                        if info.modified != modified {
1169                            info.modified = modified;
1170
1171                            handler.handle_event(Ok(fs_event::Event {
1172                                kind: notify::EventKind::Modify(notify::event::ModifyKind::Metadata(
1173                                    notify::event::MetadataKind::WriteTime,
1174                                )),
1175                                paths: vec![e.key().clone()],
1176                                attrs: Default::default(),
1177                            }))
1178                        }
1179                    }
1180                    hash_map::Entry::Vacant(e) => {
1181                        handler.handle_event(Ok(fs_event::Event {
1182                            kind: notify::EventKind::Create(if is_dir {
1183                                notify::event::CreateKind::Folder
1184                            } else {
1185                                notify::event::CreateKind::File
1186                            }),
1187                            paths: vec![e.key().clone()],
1188                            attrs: Default::default(),
1189                        }));
1190
1191                        e.insert(PollEntry {
1192                            modified,
1193                            update_flag: self.update_flag,
1194                        });
1195                    }
1196                }
1197            }
1198        }
1199
1200        self.paths.retain(|k, e| {
1201            let retain = e.update_flag == self.update_flag;
1202            if !retain {
1203                handler.handle_event(Ok(fs_event::Event {
1204                    kind: notify::EventKind::Remove(notify::event::RemoveKind::Any),
1205                    paths: vec![k.clone()],
1206                    attrs: Default::default(),
1207                }));
1208            }
1209            retain
1210        });
1211    }
1212}