zng_ext_config/
sync.rs

1use std::{marker::PhantomData, path::PathBuf};
2
3use zng_clone_move::clmv;
4use zng_ext_fs_watcher::WATCHER;
5use zng_var::Var;
6
7use super::*;
8
9/// Internal representation of configs.
10pub type RawConfigMap = indexmap::IndexMap<ConfigKey, RawConfigValue>;
11
12/// Represents a serializing/encoding backend for [`SyncConfig`].
13pub trait SyncConfigBackend: 'static {
14    /// Read/deserialize raw config from the file.
15    ///
16    /// This method runs in unblocked context.
17    fn read(file: WatchFile) -> io::Result<RawConfigMap>;
18
19    /// Write/serialize raw config to the file.
20    fn write(file: &mut WriteFile, config: &RawConfigMap) -> io::Result<()>;
21}
22
23/// Config source that auto syncs with file.
24///
25/// The [`WATCHER.sync`] is used to synchronize with the file, this type implements the binding
26/// for each key.
27///
28/// [`WATCHER.sync`]: WATCHER::sync
29pub struct SyncConfig<B: SyncConfigBackend> {
30    sync_var: Var<RawConfigMap>,
31    backend: PhantomData<fn() -> B>,
32    status: Var<ConfigStatus>,
33}
34impl<B: SyncConfigBackend> SyncConfig<B> {
35    /// Open the `file` for read/write.
36    pub fn sync(file: impl Into<PathBuf>) -> Self {
37        Self::sync_impl(file.into())
38    }
39    fn sync_impl(file: PathBuf) -> Self {
40        let (sync_var, status) = WATCHER.sync_status::<_, _, ConfigStatusError, ConfigStatusError>(
41            file,
42            RawConfigMap::default(),
43            |r| match (|| B::read(r?))() {
44                Ok(ok) => Ok(Some(ok)),
45                Err(e) => {
46                    tracing::error!("sync config read error, {e:?}");
47                    Err(vec![Arc::new(e)])
48                }
49            },
50            |map, w| {
51                let r = (|| {
52                    let mut w = w?;
53                    B::write(&mut w, &map)?;
54                    w.commit()
55                })();
56                match r {
57                    Ok(()) => Ok(()),
58                    Err(e) => {
59                        tracing::error!("sync config write error, {e:?}");
60                        Err(vec![Arc::new(e)])
61                    }
62                }
63            },
64        );
65
66        Self {
67            sync_var,
68            backend: PhantomData,
69            status,
70        }
71    }
72
73    /// Open the `file` read-only.
74    pub fn read(file: impl Into<PathBuf>) -> Self {
75        Self::read_impl(file.into())
76    }
77    fn read_impl(file: PathBuf) -> Self {
78        let (read_var, status) =
79            WATCHER.read_status::<_, _, ConfigStatusError>(file, RawConfigMap::default(), |r| match (|| B::read(r?))() {
80                Ok(ok) => Ok(Some(ok)),
81                Err(e) => {
82                    tracing::error!("read config error, {e:?}");
83                    Err(vec![Arc::new(e)])
84                }
85            });
86
87        Self {
88            sync_var: read_var,
89            backend: PhantomData,
90            status,
91        }
92    }
93}
94impl<B: SyncConfigBackend> AnyConfig for SyncConfig<B> {
95    fn get_raw(&mut self, key: ConfigKey, default: RawConfigValue, insert: bool) -> Var<RawConfigValue> {
96        // init value
97        let current_raw_value = self.sync_var.with(|m| m.get(&key).cloned());
98        let value_var = match current_raw_value {
99            Some(v) => var(v),
100            None => {
101                if insert {
102                    self.sync_var.modify(clmv!(key, default, |args| {
103                        if !args.contains_key(&key) {
104                            args.insert(key, default);
105                        }
106                    }));
107                }
108                var(default)
109            }
110        };
111
112        self.sync_var
113            .bind_modify_bidi(
114                &value_var,
115                clmv!(key, |v, m| {
116                    if let Some(value) = v.get(&key) {
117                        m.set(value.clone());
118                    }
119                }),
120                move |v, m| match m.get(&key) {
121                    Some(prev) => {
122                        if prev != v {
123                            *m.get_mut(&key).unwrap() = v.clone();
124                        }
125                    }
126                    None => {
127                        m.insert(key.clone(), v.clone());
128                    }
129                },
130            )
131            .perm();
132
133        value_var
134    }
135
136    fn contains_key(&mut self, key: ConfigKey) -> Var<bool> {
137        self.sync_var.map(move |q| q.contains_key(&key))
138    }
139
140    fn status(&self) -> Var<ConfigStatus> {
141        self.status.clone()
142    }
143
144    fn remove(&mut self, key: &ConfigKey) -> bool {
145        let contains = self.sync_var.with(|q| q.contains_key(key));
146        if contains {
147            self.sync_var.modify(clmv!(key, |m| {
148                if m.contains_key(&key) {
149                    m.shift_remove(&key);
150                }
151            }));
152        }
153        contains
154    }
155
156    fn low_memory(&mut self) {}
157}