zng_ext_config/
sync.rs

1use std::{path::PathBuf, sync::atomic::AtomicBool};
2
3use atomic::{Atomic, Ordering};
4use zng_clone_move::clmv;
5use zng_ext_fs_watcher::WATCHER;
6use zng_var::{ReadOnlyArcVar, VARS, VarUpdateId};
7
8use super::*;
9
10/// Config source that auto syncs with file.
11///
12/// The [`WATCHER.sync`] is used to synchronize with the file, this type implements the binding
13/// for each key.
14///
15/// [`WATCHER.sync`]: WATCHER::sync
16pub struct SyncConfig<M: ConfigMap> {
17    sync_var: ArcVar<M>,
18    status: ReadOnlyArcVar<ConfigStatus>,
19    shared: ConfigVars,
20}
21impl<M: ConfigMap> SyncConfig<M> {
22    /// Open write the `file`
23    pub fn sync(file: impl Into<PathBuf>) -> Self {
24        let (sync_var, status) = WATCHER.sync_status::<_, _, ConfigStatusError, ConfigStatusError>(
25            file,
26            M::empty(),
27            |r| match (|| M::read(r?))() {
28                Ok(ok) => Ok(Some(ok)),
29                Err(e) => {
30                    tracing::error!("sync config read error, {e:?}");
31                    Err(vec![Arc::new(e)])
32                }
33            },
34            |map, w| {
35                let r = (|| {
36                    let mut w = w?;
37                    map.write(&mut w)?;
38                    w.commit()
39                })();
40                match r {
41                    Ok(()) => Ok(()),
42                    Err(e) => {
43                        tracing::error!("sync config write error, {e:?}");
44                        Err(vec![Arc::new(e)])
45                    }
46                }
47            },
48        );
49
50        Self {
51            sync_var,
52            status,
53            shared: ConfigVars::default(),
54        }
55    }
56
57    fn get_new_raw(sync_var: &ArcVar<M>, key: ConfigKey, default: RawConfigValue, insert: bool) -> BoxedVar<RawConfigValue> {
58        // init var to already present value, or default.
59        let mut used_default = false;
60        let var = match sync_var.with(|m| ConfigMap::get_raw(m, &key)) {
61            Ok(raw) => {
62                // get ok
63                match raw {
64                    Some(raw) => var(raw),
65                    None => {
66                        used_default = true;
67                        var(default)
68                    }
69                }
70            }
71            Err(e) => {
72                // get error
73                tracing::error!("sync config get({key:?}) error, {e:?}");
74                used_default = true;
75                var(default)
76            }
77        };
78
79        if insert && used_default {
80            var.update();
81        }
82
83        // bind entry var
84
85        // config -> entry
86        let wk_var = var.downgrade();
87        let last_update = Atomic::new(VarUpdateId::never());
88        let request_update = AtomicBool::new(used_default);
89        sync_var
90            .hook(clmv!(key, |map| {
91                let update_id = VARS.update_id();
92                if update_id == last_update.load(Ordering::Relaxed) {
93                    return true;
94                }
95                last_update.store(update_id, Ordering::Relaxed);
96                if let Some(var) = wk_var.upgrade() {
97                    match map.value().get_raw(&key) {
98                        Ok(raw) => {
99                            // get ok
100                            if let Some(raw) = raw {
101                                var.set(raw);
102                                if request_update.swap(false, Ordering::Relaxed) {
103                                    // restored after reset, var can already have the value,
104                                    // but upstream bindings are stale, cause an update.
105                                    var.update();
106                                }
107                            } else {
108                                // backend lost entry but did not report as error, probably a reset.
109                                request_update.store(true, Ordering::Relaxed);
110                            }
111                        }
112                        Err(e) => {
113                            // get error
114                            tracing::error!("sync config get({key:?}) error, {e:?}");
115                        }
116                    }
117                    // retain hook
118                    true
119                } else {
120                    // entry var dropped, drop hook
121                    false
122                }
123            }))
124            .perm();
125
126        // entry -> config
127        let wk_sync_var = sync_var.downgrade();
128        let last_update = Atomic::new(VarUpdateId::never());
129        var.hook(clmv!(|value| {
130            let update_id = VARS.update_id();
131            if update_id == last_update.load(Ordering::Relaxed) {
132                return true;
133            }
134            last_update.store(update_id, Ordering::Relaxed);
135            if let Some(sync_var) = wk_sync_var.upgrade() {
136                let raw = value.value().clone();
137                sync_var.modify(clmv!(key, |m| {
138                    // set, only if actually changed
139                    match ConfigMap::set_raw(m, key.clone(), raw) {
140                        Ok(()) => {
141                            // set ok
142                        }
143                        Err(e) => {
144                            // set error
145                            tracing::error!("sync config set({key:?}) error, {e:?}");
146                        }
147                    }
148                }));
149
150                // retain hook
151                true
152            } else {
153                // config dropped, drop hook
154                false
155            }
156        }))
157        .perm();
158
159        var.boxed()
160    }
161
162    fn get_new<T: ConfigValue>(sync_var: &ArcVar<M>, key: impl Into<ConfigKey>, default: T, insert: bool) -> BoxedVar<T> {
163        // init var to already present value, or default.
164        let key = key.into();
165        let mut used_default = false;
166        let var = match sync_var.with(|m| ConfigMap::get::<T>(m, &key)) {
167            Ok(value) => match value {
168                Some(val) => var(val),
169                None => {
170                    used_default = true;
171                    var(default)
172                }
173            },
174            Err(e) => {
175                tracing::error!("sync config get({key:?}) error, {e:?}");
176                used_default = true;
177                var(default)
178            }
179        };
180
181        if insert && used_default {
182            var.update();
183        }
184
185        // bind entry var
186
187        // config -> entry
188        let wk_var = var.downgrade();
189        sync_var
190            .hook(clmv!(key, |map| {
191                if let Some(var) = wk_var.upgrade() {
192                    match map.value().get::<T>(&key) {
193                        Ok(value) => {
194                            if let Some(value) = value {
195                                var.set(value);
196                            }
197                        }
198                        Err(e) => {
199                            tracing::error!("sync config get({key:?}) error, {e:?}");
200                        }
201                    }
202                    true
203                } else {
204                    false
205                }
206            }))
207            .perm();
208
209        // entry -> config
210        let wk_sync_var = sync_var.downgrade();
211        var.hook(clmv!(|value| {
212            if let Some(sync_var) = wk_sync_var.upgrade() {
213                let value = value.value().clone();
214                sync_var.modify(clmv!(key, |m| {
215                    match ConfigMap::set(m, key.clone(), value) {
216                        Ok(()) => {}
217                        Err(e) => {
218                            tracing::error!("sync config set({key:?}) error, {e:?}");
219                        }
220                    }
221                }));
222                true
223            } else {
224                false
225            }
226        }))
227        .perm();
228
229        var.boxed()
230    }
231}
232impl<M: ConfigMap> AnyConfig for SyncConfig<M> {
233    fn get_raw(&mut self, key: ConfigKey, default: RawConfigValue, insert: bool, shared: bool) -> BoxedVar<RawConfigValue> {
234        if shared {
235            self.shared
236                .get_or_bind(key, |key| Self::get_new_raw(&self.sync_var, key.clone(), default, insert))
237        } else {
238            Self::get_new_raw(&self.sync_var, key, default, insert)
239        }
240    }
241
242    fn contains_key(&mut self, key: ConfigKey) -> BoxedVar<bool> {
243        self.sync_var.map(move |q| q.contains_key(&key)).boxed()
244    }
245
246    fn status(&self) -> BoxedVar<ConfigStatus> {
247        self.status.clone().boxed()
248    }
249
250    fn remove(&mut self, key: &ConfigKey) -> bool {
251        let contains = self.sync_var.with(|q| q.contains_key(key));
252        if contains {
253            self.sync_var.modify(clmv!(key, |m| {
254                ConfigMap::remove(m, &key);
255            }));
256        }
257        contains
258    }
259
260    fn low_memory(&mut self) {
261        self.shared.low_memory();
262    }
263}
264impl<M: ConfigMap> Config for SyncConfig<M> {
265    fn get<T: ConfigValue>(&mut self, key: impl Into<ConfigKey>, default: T, insert: bool) -> BoxedVar<T> {
266        self.shared
267            .get_or_bind(key.into(), |key| Self::get_new(&self.sync_var, key.clone(), default, insert))
268    }
269}