1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
21#![expect(clippy::type_complexity)]
23#![warn(unused_extern_crates)]
24#![warn(missing_docs)]
25
26use std::{
27 any::Any,
28 fmt, fs,
29 io::{self, Write as _},
30 ops,
31 path::{Path, PathBuf},
32 sync::Arc,
33 time::Duration,
34};
35
36use parking_lot::Mutex;
37use path_absolutize::Absolutize;
38use zng_app::{
39 DInstant, INSTANT,
40 event::event_args,
41 handler::{Handler, HandlerExt as _},
42};
43use zng_clone_move::clmv;
44use zng_handle::Handle;
45use zng_txt::Txt;
46use zng_unit::TimeUnits;
47use zng_var::{AnyVar, BoxAnyVarValue, Var, VarHandle, VarValue, WeakAnyVar, var};
48
49mod service;
50use service::*;
51
52mod lock;
53use lock::*;
54
55pub struct WATCHER;
59impl WATCHER {
60 pub fn debounce(&self) -> Var<Duration> {
68 WATCHER_SV.read().debounce.clone()
69 }
70
71 pub fn sync_debounce(&self) -> Var<Duration> {
79 WATCHER_SV.read().sync_debounce.clone()
80 }
81
82 pub fn poll_interval(&self) -> Var<Duration> {
90 WATCHER_SV.read().poll_interval.clone()
91 }
92
93 pub fn shutdown_timeout(&self) -> Var<Duration> {
97 WATCHER_SV.read().shutdown_timeout.clone()
98 }
99
100 pub fn watch(&self, file: impl Into<PathBuf>) -> WatcherHandle {
112 WATCHER_SV.write().watch(file.into())
113 }
114
115 pub fn watch_dir(&self, dir: impl Into<PathBuf>, recursive: bool) -> WatcherHandle {
125 WATCHER_SV.write().watch_dir(dir.into(), recursive)
126 }
127
128 pub fn read<O: VarValue>(
136 &self,
137 file: impl Into<PathBuf>,
138 init: O,
139 mut read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
140 ) -> Var<O> {
141 let out = var(init);
142 self.read_impl(
143 absolutize_or_pass(file.into()),
144 out.as_any().downgrade(),
145 Box::new(move |r| read(r).map(BoxAnyVarValue::new)),
146 );
147 out.read_only()
148 }
149 fn read_impl(
150 &self,
151 file: PathBuf,
152 out: WeakAnyVar,
153 read: Box<dyn FnMut(io::Result<WatchFile>) -> Option<BoxAnyVarValue> + Send + 'static>,
154 ) {
155 let _handle = WATCHER.watch(file.clone());
156 struct Data {
157 file: PathBuf,
158 read: Mutex<Box<dyn FnMut(io::Result<WatchFile>) -> Option<BoxAnyVarValue> + Send + 'static>>,
159 }
160 let data = Arc::new(Data {
161 file,
162 read: Mutex::new(read),
163 });
164 fn spawn_read(data: &Arc<Data>, out: AnyVar) {
165 zng_task::spawn_wait(clmv!(data, || {
166 if let Some(r) = data.read.lock()(WatchFile::open(&data.file)) {
167 out.modify(clmv!(data, |o| {
169 if o.set(r) {
170 o.push_tag(WatcherSyncWriteNote(data.file.clone()));
172 }
173 }));
174 }
175 }));
176 }
177 if data.file.exists() {
178 spawn_read(&data, out.upgrade().unwrap());
180 }
181 FS_CHANGES_EVENT
182 .hook(move |args| {
183 let _hold = &_handle;
184
185 if let Some(out) = out.upgrade() {
186 if has_sync_changes(args, &data.file) {
187 spawn_read(&data, out);
189 }
190 true
191 } else {
192 false
194 }
195 })
196 .perm();
197 }
198
199 pub fn read_status<O, S, E>(
207 &self,
208 file: impl Into<PathBuf>,
209 init: O,
210 mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, E> + Send + 'static,
211 ) -> (Var<O>, Var<S>)
212 where
213 O: VarValue,
214 S: WatcherReadStatus<E>,
215 E: Any,
216 {
217 let out = var(init);
218 let status = var(S::idle());
219 self.read_status_impl(
220 absolutize_or_pass(file.into()),
221 out.as_any().downgrade(),
222 status.as_any().clone(),
223 Box::new(move |r| match read(r) {
224 Ok(Some(r)) => Ok(Some(BoxAnyVarValue::new(r))),
225 Ok(None) => Ok(None),
226 Err(e) => Err(Box::new(e)),
227 }),
228 AnyWatcherReadStatus::new::<E, S>(),
229 );
230 (out.read_only(), status.read_only())
231 }
232 fn read_status_impl(
233 &self,
234 file: PathBuf,
235 out: WeakAnyVar,
236 status: AnyVar,
237 read: Box<dyn FnMut(io::Result<WatchFile>) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>,
238 status_mtds: AnyWatcherReadStatus,
239 ) {
240 let _handle = WATCHER.watch(file.clone());
241 struct Data {
242 file: PathBuf,
243 read: Mutex<Box<dyn FnMut(io::Result<WatchFile>) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>>,
244 status: AnyVar,
245 status_mtds: AnyWatcherReadStatus,
246 }
247 let data = Arc::new(Data {
248 file,
249 read: Mutex::new(read),
250 status,
251 status_mtds,
252 });
253 fn spawn_read(data: &Arc<Data>, out: AnyVar) {
254 data.status.set((data.status_mtds.reading)());
255 zng_task::spawn_wait(clmv!(data, || {
256 let mut read = match data.read.try_lock() {
257 Some(l) => l,
258 None => return, };
260 match read(WatchFile::open(&data.file)) {
261 Ok(u) => {
262 if let Some(r) = u {
263 out.modify(clmv!(data, |o| {
265 if o.set(r) {
266 o.push_tag(WatcherSyncWriteNote(data.file.clone()));
268 }
269 }));
270 }
271 data.status.set((data.status_mtds.idle)());
272 }
273 Err(e) => {
274 data.status.set((data.status_mtds.read_error)(Box::new(e)));
276 }
277 }
278 }));
279 }
280 if data.file.exists() {
281 spawn_read(&data, out.upgrade().unwrap());
283 }
284 FS_CHANGES_EVENT
285 .hook(move |args| {
286 let _hold = &_handle;
287 if let Some(out) = out.upgrade() {
288 if has_sync_changes(args, &data.file) {
289 spawn_read(&data, out);
291 }
292 true
293 } else {
294 false
296 }
297 })
298 .perm();
299 }
300
301 pub fn read_dir<O: VarValue>(
313 &self,
314 dir: impl Into<PathBuf>,
315 recursive: bool,
316 init: O,
317 mut read: impl FnMut(walkdir::WalkDir) -> Option<O> + Send + 'static,
318 ) -> Var<O> {
319 let out = var(init);
320 self.read_dir_impl(
321 absolutize_or_pass(dir.into()),
322 out.as_any().downgrade(),
323 recursive,
324 Box::new(move |r| read(r).map(BoxAnyVarValue::new)),
325 );
326 out.read_only()
327 }
328 fn read_dir_impl(
329 &self,
330 dir: PathBuf,
331 out: WeakAnyVar,
332 recursive: bool,
333 read: Box<dyn FnMut(walkdir::WalkDir) -> Option<BoxAnyVarValue> + Send + 'static>,
334 ) {
335 let _handle = WATCHER.watch_dir(dir.clone(), recursive);
336 struct Data {
337 dir: PathBuf,
338 read: Mutex<Box<dyn FnMut(walkdir::WalkDir) -> Option<BoxAnyVarValue> + Send + 'static>>,
339 }
340 let data = Arc::new(Data {
341 dir,
342 read: Mutex::new(read),
343 });
344 fn spawn_read(data: &Arc<Data>, recursive: bool, out: AnyVar) {
345 zng_task::spawn_wait(clmv!(data, || {
346 let dir = if recursive {
347 walkdir::WalkDir::new(&data.dir).min_depth(1)
348 } else {
349 walkdir::WalkDir::new(&data.dir).min_depth(1).max_depth(1)
350 };
351 if let Some(r) = data.read.lock()(dir) {
352 out.modify(clmv!(data, |o| {
354 if o.set(r) {
355 o.push_tag(WatcherSyncWriteNote(data.dir.clone()));
357 }
358 }));
359 }
360 }));
361 }
362 if data.dir.exists() {
363 spawn_read(&data, recursive, out.upgrade().unwrap());
365 }
366 FS_CHANGES_EVENT
367 .hook(move |args| {
368 let _hold = &_handle;
369 if let Some(out) = out.upgrade() {
370 if has_sync_changes(args, &data.dir) {
371 spawn_read(&data, recursive, out);
373 }
374 true
375 } else {
376 false
378 }
379 })
380 .perm();
381 }
382
383 pub fn read_dir_status<O, S, E>(
391 &self,
392 dir: impl Into<PathBuf>,
393 recursive: bool,
394 init: O,
395 mut read: impl FnMut(walkdir::WalkDir) -> Result<Option<O>, E> + Send + 'static,
396 ) -> (Var<O>, Var<S>)
397 where
398 O: VarValue,
399 S: WatcherReadStatus<E>,
400 E: Any,
401 {
402 let out = var(init);
403 let status = var(S::idle());
404
405 self.read_dir_status_impl(
406 absolutize_or_pass(dir.into()),
407 out.as_any().downgrade(),
408 status.as_any().clone(),
409 recursive,
410 Box::new(move |r| match read(r) {
411 Ok(Some(r)) => Ok(Some(BoxAnyVarValue::new(r))),
412 Ok(None) => Ok(None),
413 Err(e) => Err(Box::new(e)),
414 }),
415 AnyWatcherReadStatus::new::<E, S>(),
416 );
417 (out.read_only(), status.read_only())
418 }
419 fn read_dir_status_impl(
420 &self,
421 dir: PathBuf,
422 out: WeakAnyVar,
423 status: AnyVar,
424 recursive: bool,
425 read: Box<dyn FnMut(walkdir::WalkDir) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>,
426 status_mtds: AnyWatcherReadStatus,
427 ) {
428 let _handle = WATCHER.watch_dir(dir.clone(), recursive);
429 struct Data {
430 dir: PathBuf,
431 read: Mutex<Box<dyn FnMut(walkdir::WalkDir) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>>,
432 status: AnyVar,
433 status_mtds: AnyWatcherReadStatus,
434 }
435 let data = Arc::new(Data {
436 dir,
437 read: Mutex::new(read),
438 status,
439 status_mtds,
440 });
441 fn spawn_read(data: &Arc<Data>, recursive: bool, out: AnyVar) {
442 data.status.set((data.status_mtds.reading)());
443 zng_task::spawn_wait(clmv!(data, || {
444 let dir = if recursive {
445 walkdir::WalkDir::new(&data.dir).min_depth(1)
446 } else {
447 walkdir::WalkDir::new(&data.dir).min_depth(1).max_depth(1)
448 };
449 match data.read.lock()(dir) {
450 Ok(u) => {
451 if let Some(r) = u {
452 out.modify(clmv!(data, |o| {
454 if o.set(r) {
455 o.push_tag(WatcherSyncWriteNote(data.dir.clone()));
457 }
458 }));
459 }
460 data.status.set((data.status_mtds.idle)());
461 }
462 Err(e) => {
463 data.status.set((data.status_mtds.read_error)(Box::new(e)));
464 }
465 }
466 }));
467 }
468 if data.dir.exists() {
469 spawn_read(&data, recursive, out.upgrade().unwrap());
470 }
471 FS_CHANGES_EVENT
472 .hook(move |args| {
473 let _hold = &_handle;
474 if let Some(out) = out.upgrade() {
475 if has_sync_changes(args, &data.dir) {
476 spawn_read(&data, recursive, out);
478 }
479 true
480 } else {
481 false
483 }
484 })
485 .perm();
486 }
487
488 pub fn sync<O: VarValue>(
543 &self,
544 file: impl Into<PathBuf>,
545 init: O,
546 mut read: impl FnMut(io::Result<WatchFile>) -> Option<O> + Send + 'static,
547 mut write: impl FnMut(O, io::Result<WriteFile>) + Send + 'static,
548 ) -> Var<O> {
549 let out = var(init);
550 self.sync_impl(
551 absolutize_or_pass(file.into()),
552 out.as_any().clone(),
553 Box::new(move |r| read(r).map(BoxAnyVarValue::new)),
554 Box::new(move |o, r| write(o.downcast().unwrap(), r)),
555 );
556 out
557 }
558 fn sync_impl(
559 &self,
560 file: PathBuf,
561 out: AnyVar,
562 read: Box<dyn FnMut(io::Result<WatchFile>) -> Option<BoxAnyVarValue> + Send + 'static>,
563 write: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) + Send + 'static>,
564 ) {
565 self.read_impl(file.clone(), out.downgrade(), read);
566
567 struct DataWrite {
568 f: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) + Send + 'static>,
569 last: DInstant,
570 }
571 struct Data {
572 note: Arc<WatcherSyncWriteNote>,
573 write: Mutex<DataWrite>,
574 write_value: Mutex<Option<BoxAnyVarValue>>,
575 debounce_flush: Arc<SyncFlushLock>,
576 }
577 let data = Arc::new(Data {
578 note: Arc::new(WatcherSyncWriteNote(file)),
579 write: Mutex::new(DataWrite {
580 f: write,
581 last: DInstant::EPOCH,
582 }),
583 write_value: Mutex::new(None),
584 debounce_flush: Arc::new(SyncFlushLock::new()),
585 });
586 WATCHER_SV.write().push_sync_flush(&data.debounce_flush);
587
588 out.hook(move |a| {
589 for tag in a.downcast_tags::<WatcherSyncWriteNote>() {
591 if *tag == *data.note {
592 return true;
593 }
594 }
595
596 let mut d = data.write_value.lock();
598 let is_running = d.is_some() || data.write.try_lock().is_none();
599 *d = Some(a.value().clone_boxed());
600 if is_running {
601 return true;
602 }
603 zng_task::spawn_wait(clmv!(data, || {
604 let mut write = data.write.lock();
606 let _guard = data
608 .debounce_flush
609 .begin_write(WATCHER_SV.read().sync_debounce.get().saturating_sub(write.last.elapsed()));
610
611 let _tag = WATCHER.annotate(data.note.clone());
613 while let Some(value) = data.write_value.lock().take() {
614 (write.f)(value, WriteFile::open(data.note.0.clone()));
615 }
616
617 write.last = INSTANT.now();
618 }));
619
620 true
621 })
622 .perm();
623 }
624
625 pub fn sync_status<O, S, ER, EW>(
637 &self,
638 file: impl Into<PathBuf>,
639 init: O,
640 mut read: impl FnMut(io::Result<WatchFile>) -> Result<Option<O>, ER> + Send + 'static,
641 mut write: impl FnMut(O, io::Result<WriteFile>) -> Result<(), EW> + Send + 'static,
642 ) -> (Var<O>, Var<S>)
643 where
644 O: VarValue,
645 S: WatcherSyncStatus<ER, EW>,
646 ER: Any,
647 EW: Any,
648 {
649 let out = var(init);
650 let status = var(S::idle());
651 self.sync_status_impl(
652 absolutize_or_pass(file.into()),
653 out.as_any().clone(),
654 status.as_any().clone(),
655 Box::new(move |r| match read(r) {
656 Ok(Some(r)) => Ok(Some(BoxAnyVarValue::new(r))),
657 Ok(None) => Ok(None),
658 Err(e) => Err(Box::new(e)),
659 }),
660 AnyWatcherReadStatus::new::<ER, S>(),
661 Box::new(move |o, r| match write(o.downcast().unwrap(), r) {
662 Ok(()) => Ok(()),
663 Err(e) => Err(Box::new(e)),
664 }),
665 AnyWatcherWriteStatus::new::<ER, EW, S>(),
666 );
667 (out, status.read_only())
668 }
669 #[allow(clippy::too_many_arguments)]
670 fn sync_status_impl(
671 &self,
672 file: PathBuf,
673 out: AnyVar,
674 status: AnyVar,
675 read: Box<dyn FnMut(io::Result<WatchFile>) -> Result<Option<BoxAnyVarValue>, Box<dyn Any>> + Send + 'static>,
676 status_mtds: AnyWatcherReadStatus,
677 write: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) -> Result<(), Box<dyn Any>> + Send + 'static>,
678 status_w_mtds: AnyWatcherWriteStatus,
679 ) {
680 self.read_status_impl(file.clone(), out.downgrade(), status.clone(), read, status_mtds);
681
682 struct DataWrite {
683 f: Box<dyn FnMut(BoxAnyVarValue, io::Result<WriteFile>) -> Result<(), Box<dyn Any>> + Send + 'static>,
684 last: DInstant,
685 }
686 struct Data {
687 note: Arc<WatcherSyncWriteNote>,
688 write: Mutex<DataWrite>,
689 write_value: Mutex<Option<BoxAnyVarValue>>,
690 debounce_flush: Arc<SyncFlushLock>,
691 status: AnyVar,
692 status_w_mtds: AnyWatcherWriteStatus,
693 }
694 let data = Arc::new(Data {
695 note: Arc::new(WatcherSyncWriteNote(file)),
696 write: Mutex::new(DataWrite {
697 f: write,
698 last: DInstant::EPOCH,
699 }),
700 write_value: Mutex::new(None),
701 debounce_flush: Arc::new(SyncFlushLock::new()),
702 status,
703 status_w_mtds,
704 });
705 WATCHER_SV.write().push_sync_flush(&data.debounce_flush);
706
707 out.hook(move |a| {
708 for tag in a.downcast_tags::<WatcherSyncWriteNote>() {
710 if *tag == *data.note {
711 return true;
712 }
713 }
714
715 let mut d = data.write_value.lock();
717 let is_running = d.is_some() || data.write.try_lock().is_none();
718 *d = Some(a.value().clone_boxed());
719 if is_running {
720 return true;
721 }
722 data.status.set((data.status_w_mtds.writing)());
723 zng_task::spawn_wait(clmv!(data, || {
724 let mut write = data.write.lock();
726 let debounce = WATCHER_SV.read().sync_debounce.get().saturating_sub(write.last.elapsed());
728 let _guard = data.debounce_flush.begin_write(debounce);
729
730 let _tag = WATCHER.annotate(data.note.clone());
732
733 while let Some(value) = data.write_value.lock().take() {
734 match (write.f)(value, WriteFile::open(data.note.0.clone())) {
735 Ok(()) => data.status.set((data.status_w_mtds.idle)()),
736 Err(e) => data.status.set((data.status_w_mtds.write_error)(e)),
737 }
738 }
739
740 write.last = INSTANT.now();
741 }));
742
743 true
744 })
745 .perm();
746 }
747
748 pub fn on_file_changed(&self, file: impl Into<PathBuf>, ignore_propagation: bool, handler: Handler<FsChangesArgs>) -> VarHandle {
758 let file = absolutize_or_pass(file.into());
759 let handle = self.watch(file.clone());
760 FS_CHANGES_EVENT.on_event(
761 ignore_propagation,
762 handler.filtered(move |args| {
763 let _handle = &handle;
764 args.events_for_path(&file).next().is_some()
765 }),
766 )
767 }
768
769 pub fn on_dir_changed(
779 &self,
780 dir: impl Into<PathBuf>,
781 recursive: bool,
782 ignore_propagation: bool,
783 handler: Handler<FsChangesArgs>,
784 ) -> VarHandle {
785 let dir = absolutize_or_pass(dir.into());
786 let handle = self.watch_dir(dir.clone(), recursive);
787 FS_CHANGES_EVENT.on_event(
788 ignore_propagation,
789 handler.filtered(move |args| {
790 let _handle = &handle;
791 args.events_for_path(&dir).next().is_some()
792 }),
793 )
794 }
795
796 pub fn annotate(&self, note: Arc<dyn FsChangeNote>) -> FsChangeNoteHandle {
803 WATCHER_SV.write().annotate(note)
804 }
805}
806
807fn has_sync_changes(args: &FsChangesArgs, path: &Path) -> bool {
808 let mut any = false;
809 for change in args.changes_for_path(path) {
810 for note in change.notes::<WatcherSyncWriteNote>() {
811 if note.as_path() == path {
812 return false;
813 }
814 }
815 any = true;
816 }
817 any
818}
819
820fn absolutize_or_pass(path: PathBuf) -> PathBuf {
821 match path.absolutize() {
822 Ok(p) if p != path => p.to_path_buf(),
823 _ => path,
824 }
825}
826
827pub trait WatcherSyncStatus<ER = io::Error, EW = io::Error>: WatcherReadStatus<ER> {
831 fn writing() -> Self;
833 fn write_error(e: EW) -> Self;
835}
836
837pub trait WatcherReadStatus<ER = io::Error>: VarValue + PartialEq {
839 fn idle() -> Self;
841 fn reading() -> Self;
843 fn read_error(e: ER) -> Self;
845}
846
847struct AnyWatcherReadStatus {
848 idle: fn() -> BoxAnyVarValue,
849 reading: fn() -> BoxAnyVarValue,
850 read_error: fn(Box<dyn Any>) -> BoxAnyVarValue,
851}
852impl AnyWatcherReadStatus {
853 fn new<ER: Any, S: WatcherReadStatus<ER> + VarValue>() -> Self {
854 fn idle_impl<ER, S: WatcherReadStatus<ER>>() -> BoxAnyVarValue {
855 BoxAnyVarValue::new(S::idle())
856 }
857 fn reading_impl<ER, S: WatcherReadStatus<ER>>() -> BoxAnyVarValue {
858 BoxAnyVarValue::new(S::reading())
859 }
860 fn read_error_impl<ER: Any, S: WatcherReadStatus<ER> + VarValue>(e: Box<dyn Any>) -> BoxAnyVarValue {
861 BoxAnyVarValue::new(S::read_error(*e.downcast::<ER>().unwrap()))
862 }
863 Self {
864 idle: idle_impl::<ER, S>,
865 reading: reading_impl::<ER, S>,
866 read_error: read_error_impl::<ER, S>,
867 }
868 }
869}
870
871struct AnyWatcherWriteStatus {
872 idle: fn() -> BoxAnyVarValue,
873 writing: fn() -> BoxAnyVarValue,
874 write_error: fn(Box<dyn Any>) -> BoxAnyVarValue,
875}
876impl AnyWatcherWriteStatus {
877 fn new<ER, EW: Any, S: WatcherSyncStatus<ER, EW> + VarValue>() -> Self {
878 fn idle_impl<ER, S: WatcherReadStatus<ER>>() -> BoxAnyVarValue {
879 BoxAnyVarValue::new(S::idle())
880 }
881 fn writing_impl<ER, EW, S: WatcherSyncStatus<ER, EW>>() -> BoxAnyVarValue {
882 BoxAnyVarValue::new(S::writing())
883 }
884 fn write_error_impl<ER, EW: Any, S: WatcherSyncStatus<ER, EW> + VarValue>(e: Box<dyn Any>) -> BoxAnyVarValue {
885 BoxAnyVarValue::new(S::write_error(*e.downcast::<EW>().unwrap()))
886 }
887 Self {
888 idle: idle_impl::<ER, S>,
889 writing: writing_impl::<ER, EW, S>,
890 write_error: write_error_impl::<ER, EW, S>,
891 }
892 }
893}
894
895#[derive(Debug)]
901pub struct WatchFile(fs::File);
902impl WatchFile {
903 pub fn open(file: impl AsRef<Path>) -> io::Result<Self> {
905 Self::try_open_non_empty(file.as_ref(), true)
906 }
907 fn try_open_non_empty(path: &Path, retry: bool) -> io::Result<Self> {
908 let file = fs::File::open(path)?;
909
910 if retry && file.metadata()?.len() == 0 {
911 let _ = file;
913 std::thread::sleep(5.ms());
914 return Self::try_open_non_empty(path, false);
915 }
916
917 lock_shared(&file, Duration::from_secs(10))?;
918 Ok(Self(file))
919 }
920
921 pub fn text(&mut self) -> io::Result<Txt> {
923 self.string().map(Txt::from)
924 }
925
926 pub fn string(&mut self) -> io::Result<String> {
928 use std::io::Read;
929 let mut s = String::new();
930 self.0.read_to_string(&mut s)?;
931 Ok(s)
932 }
933
934 #[cfg(feature = "json")]
936 pub fn json<O>(&mut self) -> serde_json::Result<O>
937 where
938 O: serde::de::DeserializeOwned,
939 {
940 serde_json::from_reader(io::BufReader::new(&mut self.0))
941 }
942
943 #[cfg(feature = "toml")]
945 pub fn toml<O>(&mut self) -> io::Result<O>
946 where
947 O: serde::de::DeserializeOwned,
948 {
949 use std::io::Read;
950 let mut buf = io::BufReader::new(&mut self.0);
951
952 let mut toml_str = String::new();
953 buf.read_to_string(&mut toml_str)?;
954
955 toml::de::from_str(&toml_str).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
956 }
957
958 #[cfg(feature = "ron")]
960 pub fn ron<O>(&mut self) -> Result<O, ron::de::SpannedError>
961 where
962 O: serde::de::DeserializeOwned,
963 {
964 ron::de::from_reader(io::BufReader::new(&mut self.0))
965 }
966
967 #[cfg(feature = "yaml")]
969 pub fn yaml<O>(&mut self) -> serde_yaml::Result<O>
970 where
971 O: serde::de::DeserializeOwned,
972 {
973 serde_yaml::from_reader(io::BufReader::new(&mut self.0))
974 }
975
976 pub fn parse<O: std::str::FromStr>(&mut self) -> Result<O, WatchFileParseError<O::Err>> {
978 use std::io::Read;
979 let mut s = String::new();
980 self.0.read_to_string(&mut s)?;
981 O::from_str(&s).map_err(WatchFileParseError::Parse)
982 }
983}
984impl ops::Deref for WatchFile {
985 type Target = fs::File;
986
987 fn deref(&self) -> &Self::Target {
988 &self.0
989 }
990}
991impl ops::DerefMut for WatchFile {
992 fn deref_mut(&mut self) -> &mut Self::Target {
993 &mut self.0
994 }
995}
996
997const TRANSACTION_GUID: &str = "6eIw3bYMS0uKaQMkTIQacQ";
998const TRANSACTION_LOCK_EXT: &str = "6eIw3bYMS0uKaQMkTIQacQ-lock.tmp";
999
1000pub struct WriteFile {
1020 temp_file: Option<fs::File>,
1021 actual_file: Option<fs::File>,
1022 transaction_lock: Option<fs::File>,
1023
1024 actual_path: PathBuf,
1025 temp_path: PathBuf,
1026 transaction_path: PathBuf,
1027
1028 cleaned: bool,
1029}
1030impl Drop for WriteFile {
1031 fn drop(&mut self) {
1032 if !self.cleaned {
1033 tracing::error!("dropped sync write file without commit or cancel");
1034 self.clean();
1035 }
1036 }
1037}
1038impl ops::Deref for WriteFile {
1039 type Target = fs::File;
1040
1041 fn deref(&self) -> &Self::Target {
1042 self.temp_file.as_ref().unwrap()
1043 }
1044}
1045impl ops::DerefMut for WriteFile {
1046 fn deref_mut(&mut self) -> &mut Self::Target {
1047 self.temp_file.as_mut().unwrap()
1048 }
1049}
1050impl WriteFile {
1051 pub fn open(path: PathBuf) -> io::Result<Self> {
1053 let actual_path = path.absolutize()?.into_owned();
1054 if !actual_path.exists()
1055 && let Some(parent) = actual_path.parent()
1056 {
1057 std::fs::create_dir_all(parent)?;
1058 }
1059
1060 let hidden_name = match actual_path.file_name() {
1061 Some(n) => format!(".{}", n.to_string_lossy()),
1062 None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "expected file name")),
1063 };
1064
1065 let transaction_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_LOCK_EXT}"));
1066 let transaction_lock = fs::OpenOptions::new()
1067 .create(true)
1068 .truncate(true)
1069 .write(true)
1070 .open(&transaction_path)?;
1071
1072 const TIMEOUT: Duration = Duration::from_secs(10);
1073
1074 lock_exclusive(&transaction_lock, TIMEOUT)?;
1075
1076 let actual_file = fs::OpenOptions::new().write(true).create(true).truncate(false).open(&actual_path)?;
1077 lock_exclusive(&actual_file, TIMEOUT)?;
1078
1079 let mut n = 0;
1080 let mut temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
1081 let temp_file = loop {
1082 if let Ok(f) = fs::OpenOptions::new().write(true).create(true).truncate(true).open(&temp_path)
1083 && f.try_lock().is_ok()
1084 {
1085 break f;
1086 }
1087
1088 n += 1;
1089 temp_path = actual_path.with_file_name(format!("{hidden_name}.{TRANSACTION_GUID}-{n}.tmp"));
1090 n += 1;
1091 if n > 1000 {
1092 return Err(io::Error::new(io::ErrorKind::AlreadyExists, "cannot create temporary file"));
1093 }
1094 };
1095
1096 Ok(Self {
1097 actual_file: Some(actual_file),
1098 temp_file: Some(temp_file),
1099 transaction_lock: Some(transaction_lock),
1100 actual_path,
1101 temp_path,
1102 transaction_path,
1103 cleaned: false,
1104 })
1105 }
1106
1107 pub fn write_text(&mut self, txt: &str) -> io::Result<()> {
1109 self.write_all(txt.as_bytes())
1110 }
1111
1112 #[cfg(feature = "json")]
1116 pub fn write_json<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
1117 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
1118 if pretty {
1119 serde_json::to_writer_pretty(&mut buf, value)?;
1120 } else {
1121 serde_json::to_writer(&mut buf, value)?;
1122 }
1123 buf.flush()
1124 }
1125
1126 #[cfg(feature = "toml")]
1130 pub fn write_toml<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
1131 let toml = if pretty {
1132 toml::ser::to_string_pretty(value)
1133 } else {
1134 toml::ser::to_string(value)
1135 }
1136 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1137
1138 self.write_all(toml.as_bytes())
1139 }
1140
1141 #[cfg(feature = "ron")]
1145 pub fn write_ron<O: serde::Serialize>(&mut self, value: &O, pretty: bool) -> io::Result<()> {
1146 let buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
1147 struct Ffs<'a> {
1148 w: io::BufWriter<&'a mut fs::File>,
1149 }
1150 impl fmt::Write for Ffs<'_> {
1151 fn write_str(&mut self, s: &str) -> fmt::Result {
1152 self.w.write_all(s.as_bytes()).map_err(|_| fmt::Error)
1153 }
1154 }
1155 let mut buf = Ffs { w: buf };
1156 if pretty {
1157 ron::ser::to_writer_pretty(&mut buf, value, Default::default()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1158 } else {
1159 ron::ser::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1160 }
1161 buf.w.flush()
1162 }
1163
1164 #[cfg(feature = "yaml")]
1166 pub fn write_yaml<O: serde::Serialize>(&mut self, value: &O) -> io::Result<()> {
1167 let mut buf = io::BufWriter::new(ops::DerefMut::deref_mut(self));
1168 serde_yaml::to_writer(&mut buf, value).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
1169 buf.flush()
1170 }
1171
1172 pub fn commit(mut self) -> io::Result<()> {
1174 let r = self.replace_actual();
1175 self.clean();
1176 r
1177 }
1178
1179 pub fn cancel(mut self) {
1181 self.clean();
1182 }
1183
1184 fn replace_actual(&mut self) -> io::Result<()> {
1185 let mut temp_file = self.temp_file.take().unwrap();
1186 temp_file.flush()?;
1187 temp_file.sync_all()?;
1188
1189 unlock_ok(&temp_file).unwrap();
1190 drop(temp_file);
1191
1192 let actual_file = self.actual_file.take().unwrap();
1193 unlock_ok(&actual_file)?;
1194 drop(actual_file);
1195
1196 let mut retries = 0;
1197 loop {
1198 match fs::rename(&self.temp_path, &self.actual_path) {
1200 Ok(()) => {
1201 break;
1202 }
1203 Err(e) => match e.kind() {
1204 io::ErrorKind::PermissionDenied => {
1205 if retries == 5 {
1206 return Err(e);
1210 } else if retries > 0 {
1211 std::thread::sleep(30.ms());
1223 } else {
1224 match std::fs::File::options().write(true).open(&self.actual_path) {
1230 Ok(f) => {
1231 if lock_exclusive(&f, 10.secs()).is_ok() {
1232 let _ = unlock_ok(&f);
1234 }
1235 }
1236 Err(e) => match e.kind() {
1237 io::ErrorKind::NotFound => {
1238 continue;
1240 }
1241 _ => {
1242 std::thread::sleep(30.ms());
1244 }
1245 },
1246 }
1247 }
1248
1249 retries += 1;
1250 }
1251 _ => return Err(e),
1252 },
1253 }
1254 }
1255
1256 Ok(())
1257 }
1258
1259 fn clean(&mut self) {
1260 self.cleaned = true;
1261
1262 if let Some(tmp) = self.temp_file.take() {
1263 let _ = tmp.unlock();
1264 }
1265 if let Err(e) = fs::remove_file(&self.temp_path) {
1266 tracing::debug!("failed to cleanup temp file, {e}")
1267 }
1268
1269 if let Some(file) = self.actual_file.take() {
1270 let _ = file.unlock();
1271 }
1272
1273 let transaction = self.transaction_lock.take().unwrap();
1274 let _ = transaction.unlock();
1275 let _ = fs::remove_file(&self.transaction_path);
1276 }
1277}
1278
1279#[derive(Debug)]
1281#[non_exhaustive]
1282pub enum WatchFileParseError<E> {
1283 Io(io::Error),
1285 Parse(E),
1287}
1288impl<E> From<io::Error> for WatchFileParseError<E> {
1289 fn from(value: io::Error) -> Self {
1290 Self::Io(value)
1291 }
1292}
1293impl<E: fmt::Display> fmt::Display for WatchFileParseError<E> {
1294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1295 match self {
1296 WatchFileParseError::Io(e) => write!(f, "read error, {e}"),
1297 WatchFileParseError::Parse(e) => write!(f, "parse error, {e}"),
1298 }
1299 }
1300}
1301impl<E: std::error::Error + 'static> std::error::Error for WatchFileParseError<E> {
1302 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
1303 match self {
1304 WatchFileParseError::Io(e) => Some(e),
1305 WatchFileParseError::Parse(e) => Some(e),
1306 }
1307 }
1308}
1309
1310#[diagnostic::on_unimplemented(note = "`FsChangeNote` is implemented for all `T: Debug + Any + Send + Sync`")]
1314pub trait FsChangeNote: fmt::Debug + std::any::Any + Send + Sync {
1315 fn as_any(&self) -> &dyn std::any::Any;
1317}
1318impl<T: fmt::Debug + std::any::Any + Send + Sync> FsChangeNote for T {
1319 fn as_any(&self) -> &dyn std::any::Any {
1320 self
1321 }
1322}
1323
1324#[derive(Clone)]
1328#[must_use = "the note is removed when the handle is dropped"]
1329pub struct FsChangeNoteHandle(#[expect(dead_code)] Arc<Arc<dyn FsChangeNote>>);
1330
1331#[derive(Debug, PartialEq, Eq, Clone)]
1337pub struct WatcherSyncWriteNote(PathBuf);
1338impl WatcherSyncWriteNote {
1339 pub fn as_path(&self) -> &Path {
1341 self
1342 }
1343}
1344impl ops::Deref for WatcherSyncWriteNote {
1345 type Target = Path;
1346
1347 fn deref(&self) -> &Self::Target {
1348 self.0.as_path()
1349 }
1350}
1351
1352pub mod fs_event {
1358 pub use notify::event::{
1359 AccessKind, AccessMode, CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind, RenameMode,
1360 };
1361 pub use notify::{Error, ErrorKind};
1362}
1363
1364#[derive(Debug, Clone)]
1366#[non_exhaustive]
1367pub struct FsChange {
1368 pub notes: Vec<Arc<dyn FsChangeNote>>,
1372
1373 pub event: Result<fs_event::Event, Arc<fs_event::Error>>,
1375}
1376impl PartialEq for FsChange {
1377 fn eq(&self, other: &Self) -> bool {
1378 self.notes.len() == other.notes.len()
1379 && self.notes.iter().zip(other.notes.iter()).all(|(a, b)| Arc::ptr_eq(a, b))
1380 && match (&self.event, &other.event) {
1381 (Ok(a), Ok(b)) => a == b,
1382 (Err(a), Err(b)) => Arc::ptr_eq(a, b),
1383 _ => false,
1384 }
1385 }
1386}
1387impl FsChange {
1388 pub fn is_for_path(&self, path: &Path) -> bool {
1390 if let Ok(ev) = &self.event {
1391 return ev.paths.iter().any(|p| p.starts_with(path));
1392 }
1393 false
1394 }
1395
1396 pub fn is_for_glob(&self, pattern: &glob::Pattern) -> bool {
1398 if let Ok(ev) = &self.event {
1399 return ev.paths.iter().any(|p| pattern.matches_path(p));
1400 }
1401 false
1402 }
1403
1404 pub fn notes<T: FsChangeNote>(&self) -> impl Iterator<Item = &T> {
1406 self.notes.iter().filter_map(|n| FsChangeNote::as_any(&**n).downcast_ref::<T>())
1407 }
1408}
1409
1410event_args! {
1411 pub struct FsChangesArgs {
1413 pub changes: Arc<Vec<FsChange>>,
1415
1416 ..
1417
1418 fn is_in_target(&self, _id: WidgetId) -> bool {
1420 false
1421 }
1422 }
1423}
1424impl FsChangesArgs {
1425 pub fn events(&self) -> impl Iterator<Item = &fs_event::Event> + '_ {
1427 self.changes.iter().filter_map(|r| r.event.as_ref().ok())
1428 }
1429
1430 pub fn errors(&self) -> impl Iterator<Item = ¬ify::Error> + '_ {
1432 self.changes.iter().filter_map(|r| r.event.as_ref().err().map(|e| &**e))
1433 }
1434
1435 pub fn rescan(&self) -> bool {
1444 self.events().any(|e| e.need_rescan())
1445 }
1446
1447 pub fn changes_for(&self, glob: &str) -> Result<impl Iterator<Item = &FsChange> + '_, glob::PatternError> {
1449 let glob = glob::Pattern::new(glob)?;
1450 Ok(self.changes.iter().filter(move |c| c.is_for_glob(&glob)))
1451 }
1452
1453 pub fn changes_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a FsChange> + 'a {
1455 self.changes.iter().filter(move |c| c.is_for_path(path))
1456 }
1457
1458 pub fn events_for(&self, glob: &str) -> Result<impl Iterator<Item = &fs_event::Event> + '_, glob::PatternError> {
1460 let glob = glob::Pattern::new(glob)?;
1461 Ok(self.events().filter(move |ev| ev.paths.iter().any(|p| glob.matches_path(p))))
1462 }
1463
1464 pub fn events_for_path<'a>(&'a self, path: &'a Path) -> impl Iterator<Item = &'a fs_event::Event> + 'a {
1466 self.events().filter(move |ev| ev.paths.iter().any(|p| p.starts_with(path)))
1467 }
1468}
1469
1470zng_app::event::event! {
1471 pub static FS_CHANGES_EVENT: FsChangesArgs;
1473}
1474
1475#[derive(Clone)]
1477#[must_use = "the watcher is dropped if the handle is dropped"]
1478pub struct WatcherHandle(Handle<()>);
1479
1480impl WatcherHandle {
1481 pub fn dummy() -> Self {
1483 Self(Handle::dummy(()))
1484 }
1485
1486 pub fn is_permanent(&self) -> bool {
1491 self.0.is_permanent()
1492 }
1493
1494 pub fn force_drop(self) {
1496 self.0.force_drop()
1497 }
1498
1499 pub fn is_dropped(&self) -> bool {
1501 self.0.is_dropped()
1502 }
1503
1504 pub fn perm(self) {
1507 self.0.perm()
1508 }
1509}