1use std::{
2 collections::{HashMap, hash_map},
3 io, mem,
4 path::{Path, PathBuf},
5 sync::{Arc, atomic::AtomicBool},
6 time::{Duration, SystemTime},
7};
8
9use atomic::{Atomic, Ordering};
10use notify::Watcher;
11use parking_lot::Mutex;
12use path_absolutize::Absolutize;
13use zng_app::{
14 DInstant, INSTANT, app_hn_once,
15 timer::{DeadlineHandle, TIMERS},
16};
17use zng_app_context::{LocalContext, app_local};
18use zng_clone_move::clmv;
19use zng_handle::{Handle, HandleOwner};
20use zng_unit::TimeUnits;
21use zng_var::{
22 AnyVar, AnyVarHookArgs, AnyWeakVar, ArcVar, ReadOnlyArcVar, VARS, Var, VarUpdateId, VarValue, WeakVar, types::WeakArcVar, var,
23};
24
25use crate::{
26 FS_CHANGES_EVENT, FsChange, FsChangeNote, FsChangeNoteHandle, FsChangesArgs, WATCHER, WatchFile, WatcherHandle, WatcherReadStatus,
27 WatcherSyncStatus, WatcherSyncWriteNote, WriteFile, fs_event,
28};
29
30use zng_task as task;
31
32#[cfg(target_has_atomic = "64")]
33use std::sync::atomic::AtomicU64;
34
35#[cfg(not(target_has_atomic = "64"))]
36struct AtomicU64(Mutex<u64>);
37#[cfg(not(target_has_atomic = "64"))]
38impl AtomicU64 {
39 pub const fn new(u: u64) -> Self {
40 Self(Mutex::new(u))
41 }
42
43 pub fn load(&self, _: Ordering) -> u64 {
44 *self.0.lock()
45 }
46
47 pub fn store(&self, _: Ordering, u: u64) {
48 *self.0.lock() = u;
49 }
50}
51
52app_local! {
53 pub(crate) static WATCHER_SV: WatcherService = WatcherService::new();
54}
55
56pub(crate) struct WatcherService {
57 pub debounce: ArcVar<Duration>,
58 pub sync_debounce: ArcVar<Duration>,
59 pub poll_interval: ArcVar<Duration>,
60 pub shutdown_timeout: ArcVar<Duration>,
61
62 watcher: Watchers,
63
64 debounce_oldest: DInstant,
65 debounce_buffer: Vec<FsChange>,
66 debounce_timer: Option<DeadlineHandle>,
67
68 read_to_var: Vec<ReadToVar>,
69 sync_with_var: Vec<SyncWithVar>,
70
71 notes: Vec<std::sync::Weak<Arc<dyn FsChangeNote>>>,
72}
73impl WatcherService {
74 fn new() -> Self {
75 Self {
76 debounce: var(100.ms()),
77 sync_debounce: var(100.ms()),
78 poll_interval: var(1.secs()),
79 shutdown_timeout: var(1.minutes()),
80 watcher: Watchers::new(),
81 debounce_oldest: INSTANT.now(),
82 debounce_buffer: vec![],
83 debounce_timer: None,
84 read_to_var: vec![],
85 sync_with_var: vec![],
86 notes: vec![],
87 }
88 }
89
90 pub fn init_watcher(&mut self) {
91 self.watcher.init();
92 }
93
94 pub fn event(&mut self, args: &FsChangesArgs) {
95 self.read_to_var.retain_mut(|f| f.on_event(args));
96 self.sync_with_var.retain_mut(|f| f.on_event(args));
97 }
98
99 pub fn low_memory(&mut self) {
100 self.read_to_var.retain_mut(|v| v.retain());
101 let sync_debounce = self.sync_debounce.get();
102 self.sync_with_var.retain_mut(|v| v.retain(sync_debounce));
103 }
104
105 pub fn update(&mut self) {
106 if let Some(n) = self.poll_interval.get_new() {
107 self.watcher.set_poll_interval(n);
108 }
109 if !self.debounce_buffer.is_empty() {
110 if let Some(n) = self.debounce.get_new() {
111 if self.debounce_oldest.elapsed() >= n {
112 self.notify();
113 }
114 }
115 }
116 self.read_to_var.retain_mut(|f| f.retain());
117 let sync_debounce = self.sync_debounce.get();
118 self.sync_with_var.retain_mut(|f| f.retain(sync_debounce));
119 }
120
121 pub fn watch(&mut self, file: PathBuf) -> WatcherHandle {
122 self.watcher.watch(file)
123 }
124
125 pub fn watch_dir(&mut self, dir: PathBuf, recursive: bool) -> WatcherHandle {
126 self.watcher.watch_dir(dir, recursive)
127 }
128
129 pub fn read<O: VarValue>(
130 &mut self,
131 file: PathBuf,
132 init: O,
133 read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
134 ) -> ReadOnlyArcVar<O> {
135 let handle = self.watch(file.clone());
136 fn open(p: &Path) -> io::Result<WatchFile> {
137 WatchFile::open(p)
138 }
139 let (read, var) = ReadToVar::new(handle, file, init, open, read, || {});
140 self.read_to_var.push(read);
141 var
142 }
143
144 pub fn read_status<O, S, E>(
145 &mut self,
146 file: PathBuf,
147 init: O,
148 mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
149 ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
150 where
151 O: VarValue,
152 S: WatcherReadStatus<E>,
153 {
154 let handle = self.watch(file.clone());
155 fn open(p: &Path) -> io::Result<WatchFile> {
156 WatchFile::open(p)
157 }
158 let status = var(S::reading());
159
160 let (read, var) = ReadToVar::new(
161 handle,
162 file,
163 init,
164 open,
165 clmv!(status, |d| {
167 status.set(S::reading());
168 match read(d) {
169 Ok(r) => {
170 if r.is_none() {
171 status.set(S::idle());
172 }
173 r
174 }
175 Err(e) => {
176 status.set(S::read_error(e));
177 None
178 }
179 }
180 }),
181 clmv!(status, || {
183 status.set(S::idle());
184 }),
185 );
186 self.read_to_var.push(read);
187
188 (var, status.read_only())
189 }
190
191 pub fn read_dir<O: VarValue>(
192 &mut self,
193 dir: PathBuf,
194 recursive: bool,
195 init: O,
196 read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
197 ) -> ReadOnlyArcVar<O> {
198 let handle = self.watch_dir(dir.clone(), recursive);
199 fn open(p: &Path) -> walkdir::WalkDir {
200 walkdir::WalkDir::new(p).min_depth(1).max_depth(1)
201 }
202 fn open_recursive(p: &Path) -> walkdir::WalkDir {
203 walkdir::WalkDir::new(p).min_depth(1)
204 }
205 let (read, var) = ReadToVar::new(handle, dir, init, if recursive { open_recursive } else { open }, read, || {});
206 self.read_to_var.push(read);
207 var
208 }
209 pub fn read_dir_status<O, S, E>(
210 &mut self,
211 dir: PathBuf,
212 recursive: bool,
213 init: O,
214 mut read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
215 ) -> (ReadOnlyArcVar<O>, ReadOnlyArcVar<S>)
216 where
217 O: VarValue,
218 S: WatcherReadStatus<E>,
219 {
220 let status = var(S::reading());
221
222 let handle = self.watch_dir(dir.clone(), recursive);
223 fn open(p: &Path) -> walkdir::WalkDir {
224 walkdir::WalkDir::new(p).min_depth(1).max_depth(1)
225 }
226 fn open_recursive(p: &Path) -> walkdir::WalkDir {
227 walkdir::WalkDir::new(p).min_depth(1)
228 }
229
230 let (read, var) = ReadToVar::new(
231 handle,
232 dir,
233 init,
234 if recursive { open_recursive } else { open },
235 clmv!(status, |d| {
237 status.set(S::reading());
238 match read(d) {
239 Ok(r) => {
240 if r.is_none() {
241 status.set(S::idle());
242 }
243 r
244 }
245 Err(e) => {
246 status.set(S::read_error(e));
247 None
248 }
249 }
250 }),
251 clmv!(status, || {
253 status.set(S::idle());
254 }),
255 );
256 self.read_to_var.push(read);
257
258 (var, status.read_only())
259 }
260
261 pub fn sync<O: VarValue>(
262 &mut self,
263 file: PathBuf,
264 init: O,
265 read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
266 mut write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
267 ) -> ArcVar<O> {
268 let handle = self.watch(file.clone());
269
270 let (sync, var) = SyncWithVar::new(handle, file, init, read, move |o, _, f| write(o, f), |_| {});
271 self.sync_with_var.push(sync);
272 var
273 }
274
275 pub fn sync_status<O, S, ER, EW>(
276 &mut self,
277 file: PathBuf,
278 init: O,
279 mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
280 mut write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
281 ) -> (ArcVar<O>, ReadOnlyArcVar<S>)
282 where
283 O: VarValue,
284 S: WatcherSyncStatus<ER, EW>,
285 {
286 let handle = self.watch(file.clone());
287 let latest_write = Arc::new(Atomic::new(VarUpdateId::never()));
288
289 let status = var(S::reading());
290 let (sync, var) = SyncWithVar::new(
291 handle,
292 file,
293 init,
294 clmv!(status, |f| {
296 status.set(S::reading());
297 match read(f) {
298 Ok(r) => {
299 if r.is_none() {
300 status.set(S::idle());
301 }
302 r
303 }
304 Err(e) => {
305 status.set(S::read_error(e));
306 None
307 }
308 }
309 }),
310 clmv!(status, latest_write, |o, o_id, f| {
312 status.set(S::writing()); match write(o, f) {
314 Ok(()) => {
315 if latest_write.load(Ordering::Relaxed) == o_id {
316 status.set(S::idle());
317 }
318 }
319 Err(e) => {
320 status.set(S::write_error(e));
321 }
322 }
323 }),
324 clmv!(status, |is_read| {
326 status.set(if is_read {
327 S::idle()
328 } else {
329 let id = VARS.update_id();
330 latest_write.store(id, Ordering::Relaxed);
331
332 S::writing()
333 });
334 }),
335 );
336
337 self.sync_with_var.push(sync);
338
339 (var, status.read_only())
340 }
341
342 fn on_watcher(&mut self, r: Result<fs_event::Event, fs_event::Error>) {
343 if let Ok(r) = &r {
344 if !self.watcher.allow(r) {
345 return;
347 }
348 }
349
350 let notify = self.debounce_oldest.elapsed() >= self.debounce.get();
351
352 let mut notes = Vec::with_capacity(self.notes.len());
353 self.notes.retain(|n| match n.upgrade() {
354 Some(n) => {
355 notes.push(Arc::clone(&*n));
356 true
357 }
358 None => false,
359 });
360
361 self.debounce_buffer.push(FsChange { notes, event: r });
362
363 if notify {
364 self.notify();
365 } else if self.debounce_timer.is_none() {
366 self.debounce_timer = Some(TIMERS.on_deadline(
367 self.debounce.get(),
368 app_hn_once!(|_| {
369 WATCHER_SV.write().on_debounce_timer();
370 }),
371 ));
372 }
373 }
374
375 pub fn annotate(&mut self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
376 let handle = Arc::new(note);
377 self.notes.push(Arc::downgrade(&handle));
378 FsChangeNoteHandle(handle)
379 }
380
381 fn on_debounce_timer(&mut self) {
382 if !self.debounce_buffer.is_empty() {
383 self.notify();
384 }
385 }
386
387 fn notify(&mut self) {
388 let changes = mem::take(&mut self.debounce_buffer);
389 let now = INSTANT.now();
390 self.debounce_oldest = now;
391 self.debounce_timer = None;
392
393 FS_CHANGES_EVENT.notify(FsChangesArgs::new(now, Default::default(), changes));
394 }
395
396 pub(crate) fn shutdown(&mut self) -> Vec<SyncWithVar> {
398 self.watcher.deinit();
399 mem::take(&mut self.sync_with_var)
400 }
401}
402fn notify_watcher_handler() -> impl notify::EventHandler {
403 let mut ctx = LocalContext::capture();
404 move |r| ctx.with_context(|| WATCHER_SV.write().on_watcher(r))
405}
406
407struct ReadToVar {
408 read: Box<dyn Fn(&Arc<AtomicBool>, &WatcherHandle, ReadEvent) + Send + Sync>,
409 pending: Arc<AtomicBool>,
410 handle: WatcherHandle,
411}
412impl ReadToVar {
413 fn new<O: VarValue, R: 'static>(
414 handle: WatcherHandle,
415 mut path: PathBuf,
416 init: O,
417 load: fn(&Path) -> R,
418 read: impl FnMut(R) -> Option<O> + Send + 'static,
419 on_modify: impl Fn() + Send + Sync + 'static,
420 ) -> (Self, ReadOnlyArcVar<O>) {
421 if let Ok(p) = path.absolutize() {
422 path = p.into_owned();
423 }
424 let path = Arc::new(path);
425 let var = var(init);
426 let on_modify = Arc::new(on_modify);
427
428 let pending = Arc::new(AtomicBool::new(false));
429 let read = Arc::new(Mutex::new(read));
430 let wk_var = var.downgrade();
431
432 let read = Box::new(move |pending: &Arc<AtomicBool>, handle: &WatcherHandle, ev: ReadEvent| {
434 if wk_var.strong_count() == 0 {
435 handle.clone().force_drop();
436 return;
437 }
438
439 let spawn = match ev {
440 ReadEvent::Update => false,
441 ReadEvent::Event(args) => !pending.load(Ordering::Relaxed) && args.events_for_path(&path).next().is_some(),
442 ReadEvent::Init => true,
443 };
444
445 if !spawn {
446 return;
447 }
448
449 pending.store(true, Ordering::Relaxed);
450 if read.try_lock().is_none() {
451 return;
453 }
454 task::spawn_wait(clmv!(read, wk_var, path, handle, pending, on_modify, || {
455 let mut read = read.lock();
456 while pending.swap(false, Ordering::Relaxed) {
457 if let Some(update) = read(load(path.as_path())) {
458 if let Some(var) = wk_var.upgrade() {
459 var.modify(clmv!(on_modify, |vm| {
460 vm.set(update);
461 on_modify();
462 }));
463 } else {
464 handle.force_drop();
466 break;
467 }
468 }
469 }
470 }));
471 });
472 read(&pending, &handle, ReadEvent::Init);
473
474 (Self { read, pending, handle }, var.read_only())
475 }
476
477 pub fn on_event(&mut self, args: &FsChangesArgs) -> bool {
481 if !self.handle.is_dropped() {
482 (self.read)(&self.pending, &self.handle, ReadEvent::Event(args));
483 }
484 !self.handle.is_dropped()
485 }
486
487 fn retain(&mut self) -> bool {
489 if !self.handle.is_dropped() {
490 (self.read)(&self.pending, &self.handle, ReadEvent::Update);
491 }
492 !self.handle.is_dropped()
493 }
494}
495enum ReadEvent<'a> {
496 Update,
497 Event(&'a FsChangesArgs),
498 Init,
499}
500
501pub(crate) struct SyncWithVar {
502 task: Box<dyn Fn(&WatcherHandle, SyncEvent) + Send + Sync>,
503 handle: WatcherHandle,
504}
505impl SyncWithVar {
506 fn new<O, R, W, U>(handle: WatcherHandle, mut file: PathBuf, init: O, read: R, write: W, var_hook_and_modify: U) -> (Self, ArcVar<O>)
507 where
508 O: VarValue,
509 R: FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
510 W: FnMut(O, VarUpdateId, io::Result<WriteFile>) + Send + 'static,
511 U: Fn(bool) + Send + Sync + 'static,
512 {
513 if let Ok(p) = file.absolutize() {
514 file = p.into_owned();
515 }
516
517 let path = Arc::new(WatcherSyncWriteNote(file));
518 let latest_from_read = Arc::new(AtomicBool::new(false));
519
520 let var_hook_and_modify = Arc::new(var_hook_and_modify);
521
522 let var = var(init);
523 var.hook_any(Box::new(clmv!(
524 path,
525 latest_from_read,
526 var_hook_and_modify,
527 |args: &AnyVarHookArgs| {
528 let is_read = args.downcast_tags::<Arc<WatcherSyncWriteNote>>().any(|n| n == &path);
529 latest_from_read.store(is_read, Ordering::Relaxed);
530 var_hook_and_modify(is_read);
531 true
532 }
533 )))
534 .perm();
535
536 type PendingFlag = u8;
537 const READ: PendingFlag = 0b01;
538 const WRITE: PendingFlag = 0b11;
539
540 struct TaskData<R, W, O: VarValue> {
541 pending: Atomic<PendingFlag>,
542 read_write: Mutex<(R, W)>,
543 wk_var: WeakArcVar<O>,
544 last_write: AtomicU64, }
546 let task_data = Arc::new(TaskData {
547 pending: Atomic::new(0),
548 read_write: Mutex::new((read, write)),
549 wk_var: var.downgrade(),
550 last_write: AtomicU64::new(0),
551 });
552
553 let task = Box::new(move |handle: &WatcherHandle, ev: SyncEvent| {
555 let var = match task_data.wk_var.upgrade() {
556 Some(v) => v,
557 None => {
558 handle.clone().force_drop();
559 return;
560 }
561 };
562
563 let mut debounce = None;
564
565 let mut pending = 0;
566
567 match ev {
568 SyncEvent::Update(sync_debounce) => {
569 if var.is_new() && !latest_from_read.load(Ordering::Relaxed) {
570 debounce = Some(sync_debounce);
571 pending |= WRITE;
572 } else {
573 return;
574 }
575 }
576 SyncEvent::Event(args) => {
577 if args.rescan() {
578 pending |= READ;
579 } else {
580 'ev: for ev in args.changes_for_path(&path) {
581 for note in ev.notes::<WatcherSyncWriteNote>() {
582 if path.as_path() == note.as_path() {
583 continue 'ev;
585 }
586 }
587
588 pending |= READ;
589 break;
590 }
591 if pending == 0 {
592 return;
593 }
594 }
595 }
596 SyncEvent::Init => {
597 if path.exists() {
598 pending |= READ;
599 } else {
600 pending |= WRITE;
601 }
602 }
603 SyncEvent::FlushShutdown => {
604 let timeout = WATCHER_SV.read().shutdown_timeout.get();
605 if task_data.read_write.try_lock_for(timeout).is_none() {
606 tracing::error!("not all io operations finished on shutdown, timeout after {timeout:?}");
607 }
608 return;
609 }
610 };
611 drop(var);
612
613 task_data.pending.fetch_or(pending, Ordering::Relaxed);
614
615 if task_data.read_write.try_lock().is_none() {
616 return;
618 }
619 task::spawn_wait(clmv!(task_data, path, var_hook_and_modify, handle, || {
620 let mut read_write = task_data.read_write.lock();
621 let (read, write) = &mut *read_write;
622
623 loop {
624 let pending = task_data.pending.swap(0, Ordering::Relaxed);
625
626 if pending == WRITE {
627 if let Some(d) = debounce {
628 let now_ms = SystemTime::now()
629 .duration_since(SystemTime::UNIX_EPOCH)
630 .unwrap_or_default()
631 .as_millis() as u64;
632 let prev_ms = task_data.last_write.load(Ordering::Relaxed);
633 let elapsed = Duration::from_millis(now_ms - prev_ms);
634 if elapsed < d {
635 std::thread::sleep(d - elapsed);
636 }
637 task_data.last_write.store(now_ms, Ordering::Relaxed);
638 }
639
640 let (id, value) = if let Some(var) = task_data.wk_var.upgrade() {
641 (var.last_update(), var.get())
642 } else {
643 handle.force_drop();
644 return;
645 };
646
647 {
648 let _note = WATCHER.annotate(path.clone());
649 write(value, id, WriteFile::open(path.to_path_buf()));
650 }
651
652 if task_data.wk_var.strong_count() == 0 {
653 handle.force_drop();
654 return;
655 }
656 } else if pending == READ {
657 if task_data.wk_var.strong_count() == 0 {
658 handle.force_drop();
659 return;
660 }
661
662 if let Some(update) = read(WatchFile::open(path.as_path())) {
663 if let Some(var) = task_data.wk_var.upgrade() {
664 var.modify(clmv!(path, var_hook_and_modify, |vm| {
665 vm.set(update);
666 vm.push_tag(path);
667 var_hook_and_modify(true);
668 }));
669 } else {
670 handle.force_drop();
671 return;
672 }
673 }
674 } else {
675 return;
676 }
677 }
678 }));
679 });
680
681 task(&handle, SyncEvent::Init);
682
683 (Self { task, handle }, var)
684 }
685
686 pub fn on_event(&mut self, args: &FsChangesArgs) -> bool {
690 if !self.handle.is_dropped() {
691 (self.task)(&self.handle, SyncEvent::Event(args));
692 }
693 !self.handle.is_dropped()
694 }
695
696 fn retain(&mut self, sync_debounce: Duration) -> bool {
698 if !self.handle.is_dropped() {
699 (self.task)(&self.handle, SyncEvent::Update(sync_debounce));
700 }
701 !self.handle.is_dropped()
702 }
703
704 pub fn flush_shutdown(&mut self) {
705 if !self.handle.is_dropped() {
706 (self.task)(&self.handle, SyncEvent::FlushShutdown);
707 }
708 }
709}
710enum SyncEvent<'a> {
711 Update(Duration),
712 Event(&'a FsChangesArgs),
713 Init,
714 FlushShutdown,
715}
716
717struct Watchers {
718 dirs: HashMap<PathBuf, DirWatcher>,
719 watcher: Mutex<Box<dyn notify::Watcher + Send>>, error_watcher: Option<PollWatcher>,
722 poll_interval: Duration,
723}
724impl Watchers {
725 fn new() -> Self {
726 Self {
727 dirs: HashMap::default(),
728 watcher: Mutex::new(Box::new(notify::NullWatcher)),
729 error_watcher: None,
730 poll_interval: 1.secs(),
731 }
732 }
733
734 fn watch(&mut self, file: PathBuf) -> WatcherHandle {
735 self.watch_insert(file, WatchMode::File(std::ffi::OsString::new()))
736 }
737
738 fn watch_dir(&mut self, dir: PathBuf, recursive: bool) -> WatcherHandle {
739 self.watch_insert(dir, if recursive { WatchMode::Descendants } else { WatchMode::Children })
740 }
741
742 fn watch_insert(&mut self, mut path: PathBuf, mut mode: WatchMode) -> WatcherHandle {
744 use path_absolutize::*;
745 path = match path.absolutize() {
746 Ok(p) => p.to_path_buf(),
747 Err(e) => {
748 tracing::error!("cannot watch `{}`, failed to absolutize `{}`", path.display(), e);
749 return WatcherHandle::dummy();
750 }
751 };
752
753 if let WatchMode::File(name) = &mut mode {
754 if let Some(n) = path.file_name() {
755 *name = n.to_os_string();
756 path.pop();
757 } else {
758 tracing::error!("cannot watch file `{}`", path.display());
759 return WatcherHandle::dummy();
760 }
761 }
762
763 let w = self.dirs.entry(path.clone()).or_default();
764
765 for (m, handle) in &w.modes {
766 if m == &mode {
767 if let Some(h) = handle.weak_handle().upgrade() {
768 return WatcherHandle(h);
769 }
770 }
771 }
772
773 let (owner, handle) = Handle::new(());
774
775 let recursive = matches!(&mode, WatchMode::Descendants);
776
777 if w.modes.is_empty() {
778 if Self::inner_watch_dir(&mut **self.watcher.get_mut(), &path, recursive).is_err() {
779 Self::inner_watch_error_dir(&mut self.error_watcher, &path, recursive, self.poll_interval);
780 w.is_in_error_watcher = true;
781 }
782 } else {
783 let was_recursive = w.recursive();
784 if !was_recursive && recursive {
785 let watcher = &mut **self.watcher.get_mut();
786
787 if mem::take(&mut w.is_in_error_watcher) {
788 Self::inner_unwatch_dir(self.error_watcher.as_mut().unwrap(), &path);
789 } else {
790 Self::inner_unwatch_dir(watcher, &path);
791 }
792 if Self::inner_watch_dir(watcher, &path, recursive).is_err() {
793 Self::inner_watch_error_dir(&mut self.error_watcher, &path, recursive, self.poll_interval);
794 }
795 }
796 }
797
798 w.modes.push((mode, owner));
799
800 WatcherHandle(handle)
801 }
802
803 fn cleanup(&mut self) {
804 let watcher = &mut **self.watcher.get_mut();
805 self.dirs.retain(|k, v| {
806 let r = v.retain();
807 if !r {
808 if v.is_in_error_watcher {
809 Self::inner_unwatch_dir(self.error_watcher.as_mut().unwrap(), k);
810 } else {
811 Self::inner_unwatch_dir(watcher, k);
812 }
813 }
814 r
815 })
816 }
817
818 fn set_poll_interval(&mut self, interval: Duration) {
819 self.poll_interval = interval;
820 if let Err(e) = self
821 .watcher
822 .get_mut()
823 .configure(notify::Config::default().with_poll_interval(interval))
824 {
825 tracing::error!("error setting the watcher poll interval: {e}");
826 }
827 if let Some(w) = &mut self.error_watcher {
828 w.configure(notify::Config::default().with_poll_interval(interval)).unwrap();
829 }
830 }
831
832 fn init(&mut self) {
833 *self.watcher.get_mut() = match notify::recommended_watcher(notify_watcher_handler()) {
834 Ok(w) => Box::new(w),
835 Err(e) => {
836 tracing::error!("error creating watcher\n{e}\nfallback to slow poll watcher");
837 match PollWatcher::new(
838 notify_watcher_handler(),
839 notify::Config::default().with_poll_interval(self.poll_interval),
840 ) {
841 Ok(w) => Box::new(w),
842 Err(e) => {
843 tracing::error!("error creating poll watcher\n{e}\nfs watching disabled");
844 Box::new(notify::NullWatcher)
845 }
846 }
847 }
848 };
849
850 self.cleanup();
851
852 let watcher = &mut **self.watcher.get_mut();
853 for (dir, w) in &mut self.dirs {
854 let recursive = w.recursive();
855 if Self::inner_watch_dir(watcher, dir.as_path(), recursive).is_err() {
856 Self::inner_watch_error_dir(&mut self.error_watcher, dir, recursive, self.poll_interval);
857 w.is_in_error_watcher = true;
858 }
859 }
860 }
861
862 fn deinit(&mut self) {
863 *self.watcher.get_mut() = Box::new(notify::NullWatcher);
864 }
865
866 fn inner_watch_dir(watcher: &mut dyn notify::Watcher, dir: &Path, recursive: bool) -> Result<(), notify::ErrorKind> {
868 let recursive = if recursive {
869 notify::RecursiveMode::Recursive
870 } else {
871 notify::RecursiveMode::NonRecursive
872 };
873 if let Err(e) = watcher.watch(dir, recursive) {
874 match e.kind {
875 notify::ErrorKind::Generic(e) => {
876 if dir.try_exists().unwrap_or(true) {
877 tracing::error!("cannot watch dir `{}`, {e}", dir.display())
878 } else {
879 return Err(notify::ErrorKind::PathNotFound);
880 }
881 }
882 notify::ErrorKind::Io(e) => {
883 if let io::ErrorKind::NotFound = e.kind() {
884 return Err(notify::ErrorKind::PathNotFound);
885 } else if dir.try_exists().unwrap_or(true) {
886 tracing::error!("cannot watch dir `{}`, {e}", dir.display())
887 } else {
888 return Err(notify::ErrorKind::PathNotFound);
889 }
890 }
891 e @ notify::ErrorKind::PathNotFound | e @ notify::ErrorKind::MaxFilesWatch => return Err(e),
892 notify::ErrorKind::InvalidConfig(e) => unreachable!("{e:?}"),
893 notify::ErrorKind::WatchNotFound => unreachable!(),
894 }
895 }
896 Ok(())
897 }
898
899 fn inner_watch_error_dir(watcher: &mut Option<PollWatcher>, dir: &Path, recursive: bool, poll_interval: Duration) {
900 let watcher = watcher.get_or_insert_with(|| {
901 PollWatcher::new(
902 notify_watcher_handler(),
903 notify::Config::default().with_poll_interval(poll_interval),
904 )
905 .unwrap()
906 });
907 Self::inner_watch_dir(watcher, dir, recursive).unwrap();
908 }
909
910 fn inner_unwatch_dir(watcher: &mut dyn notify::Watcher, dir: &Path) {
911 if let Err(e) = watcher.unwatch(dir) {
912 match e.kind {
913 notify::ErrorKind::Generic(e) => {
914 tracing::error!("cannot unwatch dir `{}`, {e}", dir.display());
915 }
916 notify::ErrorKind::Io(e) => {
917 tracing::error!("cannot unwatch dir `{}`, {e}", dir.display());
918 }
919 notify::ErrorKind::PathNotFound => {} notify::ErrorKind::WatchNotFound => {} notify::ErrorKind::InvalidConfig(_) => unreachable!(),
922 notify::ErrorKind::MaxFilesWatch => unreachable!(),
923 }
924 }
925 }
926
927 fn allow(&mut self, r: &fs_event::Event) -> bool {
928 if let notify::EventKind::Access(_) = r.kind {
929 if !r.need_rescan() {
930 return false;
931 }
932 }
933
934 for (dir, w) in &mut self.dirs {
935 let mut matched = false;
936
937 'modes: for (mode, _) in &w.modes {
938 match mode {
939 WatchMode::File(f) => {
940 for path in &r.paths {
941 if let Some(name) = path.file_name() {
942 if name == f {
943 if let Some(path) = path.parent() {
944 if path == dir {
945 matched = true;
947 break 'modes;
948 }
949 }
950 }
951 }
952 }
953 }
954 WatchMode::Children => {
955 for path in &r.paths {
956 if let Some(path) = path.parent() {
957 if path == dir {
958 matched = true;
960 break 'modes;
961 }
962 }
963 }
964 }
965 WatchMode::Descendants => {
966 for path in &r.paths {
967 if path.starts_with(dir) {
968 matched = true;
970 break 'modes;
971 }
972 }
973 }
974 }
975 }
976
977 if matched {
978 if mem::take(&mut w.is_in_error_watcher) {
979 Self::inner_unwatch_dir(self.error_watcher.as_mut().unwrap(), dir);
982 let recursive = w.recursive();
983 if Self::inner_watch_dir(&mut **self.watcher.get_mut(), dir, recursive).is_err() {
984 Self::inner_watch_error_dir(&mut self.error_watcher, dir, recursive, self.poll_interval);
986 w.is_in_error_watcher = true;
987 }
988 }
989 return true;
990 }
991 }
992 false
993 }
994}
995
996#[derive(PartialEq, Eq)]
997enum WatchMode {
998 File(std::ffi::OsString),
999 Children,
1000 Descendants,
1001}
1002
1003#[derive(Default)]
1004struct DirWatcher {
1005 is_in_error_watcher: bool,
1006 modes: Vec<(WatchMode, HandleOwner<()>)>,
1007}
1008impl DirWatcher {
1009 fn recursive(&self) -> bool {
1010 self.modes.iter().any(|m| matches!(&m.0, WatchMode::Descendants))
1011 }
1012
1013 fn retain(&mut self) -> bool {
1014 self.modes.retain(|(_, h)| !h.is_dropped());
1015 !self.modes.is_empty()
1016 }
1017}
1018
1019enum PollMsg {
1020 Watch(PathBuf, bool),
1021 Unwatch(PathBuf),
1022 SetConfig(notify::Config),
1023}
1024
1025struct PollWatcher {
1029 sender: flume::Sender<PollMsg>,
1030 worker: Option<std::thread::JoinHandle<()>>,
1031}
1032
1033impl PollWatcher {
1034 fn send_msg(&mut self, msg: PollMsg) {
1035 if self.sender.send(msg).is_err() {
1036 if let Some(worker) = self.worker.take() {
1037 if let Err(panic) = worker.join() {
1038 std::panic::resume_unwind(panic);
1039 }
1040 }
1041 }
1042 }
1043}
1044impl notify::Watcher for PollWatcher {
1045 fn new<F: notify::EventHandler>(mut event_handler: F, mut config: notify::Config) -> notify::Result<Self>
1046 where
1047 Self: Sized,
1048 {
1049 let (sender, rcv) = flume::unbounded();
1050 let mut dirs = HashMap::<PathBuf, PollInfo, _>::new();
1051 let worker = std::thread::Builder::new()
1052 .name(String::from("poll-watcher"))
1053 .spawn(move || {
1054 loop {
1055 match rcv.recv_timeout(config.poll_interval_v2().unwrap_or_default()) {
1056 Ok(msg) => match msg {
1057 PollMsg::Watch(d, r) => {
1058 let info = PollInfo::new(&d, r);
1059 dirs.insert(d, info);
1060 }
1061 PollMsg::Unwatch(d) => {
1062 if dirs.remove(&d).is_none() {
1063 event_handler.handle_event(Err(notify::Error {
1064 kind: notify::ErrorKind::WatchNotFound,
1065 paths: vec![d],
1066 }))
1067 }
1068 }
1069 PollMsg::SetConfig(c) => config = c,
1070 },
1071 Err(e) => match e {
1072 flume::RecvTimeoutError::Timeout => {} flume::RecvTimeoutError::Disconnected => return, },
1075 }
1076
1077 for (dir, info) in &mut dirs {
1078 info.poll(dir, &mut event_handler);
1079 }
1080 }
1081 })
1082 .expect("failed to spawn poll-watcher thread");
1083
1084 Ok(Self {
1085 sender,
1086 worker: Some(worker),
1087 })
1088 }
1089
1090 fn watch(&mut self, path: &Path, recursive_mode: notify::RecursiveMode) -> notify::Result<()> {
1091 let msg = PollMsg::Watch(path.to_path_buf(), matches!(recursive_mode, notify::RecursiveMode::Recursive));
1092 self.send_msg(msg);
1093 Ok(())
1094 }
1095
1096 fn unwatch(&mut self, path: &Path) -> notify::Result<()> {
1097 let msg = PollMsg::Unwatch(path.to_path_buf());
1098 self.send_msg(msg);
1099 Ok(())
1100 }
1101
1102 fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
1103 let msg = PollMsg::SetConfig(option);
1104 self.send_msg(msg);
1105 Ok(true)
1106 }
1107
1108 fn kind() -> notify::WatcherKind
1109 where
1110 Self: Sized,
1111 {
1112 notify::WatcherKind::PollWatcher
1113 }
1114}
1115#[derive(Default)]
1116struct PollInfo {
1117 recursive: bool,
1118 paths: HashMap<PathBuf, PollEntry>,
1119 update_flag: bool,
1121}
1122struct PollEntry {
1123 modified: std::time::SystemTime,
1124 update_flag: bool,
1126}
1127impl PollInfo {
1128 fn new(path: &Path, recursive: bool) -> Self {
1129 let mut paths = HashMap::new();
1130
1131 for entry in walkdir::WalkDir::new(path)
1132 .min_depth(1)
1133 .max_depth(if recursive { usize::MAX } else { 1 })
1134 .into_iter()
1135 .flatten()
1136 {
1137 if let Some(modified) = entry.metadata().ok().and_then(|m| m.modified().ok()) {
1138 paths.insert(
1139 entry.into_path(),
1140 PollEntry {
1141 modified,
1142 update_flag: false,
1143 },
1144 );
1145 }
1146 }
1147
1148 Self {
1149 recursive,
1150 paths,
1151 update_flag: false,
1152 }
1153 }
1154
1155 fn poll(&mut self, root: &Path, handler: &mut impl notify::EventHandler) {
1156 self.update_flag = !self.update_flag;
1157 for entry in walkdir::WalkDir::new(root)
1158 .min_depth(1)
1159 .max_depth(if self.recursive { usize::MAX } else { 1 })
1160 .into_iter()
1161 .flatten()
1162 {
1163 if let Some((is_dir, modified)) = entry.metadata().ok().and_then(|m| Some((m.is_dir(), m.modified().ok()?))) {
1164 match self.paths.entry(entry.into_path()) {
1165 hash_map::Entry::Occupied(mut e) => {
1166 let info = e.get_mut();
1167 info.update_flag = self.update_flag;
1168 if info.modified != modified {
1169 info.modified = modified;
1170
1171 handler.handle_event(Ok(fs_event::Event {
1172 kind: notify::EventKind::Modify(notify::event::ModifyKind::Metadata(
1173 notify::event::MetadataKind::WriteTime,
1174 )),
1175 paths: vec![e.key().clone()],
1176 attrs: Default::default(),
1177 }))
1178 }
1179 }
1180 hash_map::Entry::Vacant(e) => {
1181 handler.handle_event(Ok(fs_event::Event {
1182 kind: notify::EventKind::Create(if is_dir {
1183 notify::event::CreateKind::Folder
1184 } else {
1185 notify::event::CreateKind::File
1186 }),
1187 paths: vec![e.key().clone()],
1188 attrs: Default::default(),
1189 }));
1190
1191 e.insert(PollEntry {
1192 modified,
1193 update_flag: self.update_flag,
1194 });
1195 }
1196 }
1197 }
1198 }
1199
1200 self.paths.retain(|k, e| {
1201 let retain = e.update_flag == self.update_flag;
1202 if !retain {
1203 handler.handle_event(Ok(fs_event::Event {
1204 kind: notify::EventKind::Remove(notify::event::RemoveKind::Any),
1205 paths: vec![k.clone()],
1206 attrs: Default::default(),
1207 }));
1208 }
1209 retain
1210 });
1211 }
1212}