zng_ext_config/
fallback.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2
3use crate::task::parking_lot::Mutex;
4use zng_clone_move::clmv;
5use zng_unique_id::unique_id_32;
6use zng_var::merge_var;
7
8use super::*;
9
10/// Reset controls of a [`FallbackConfig`].
11pub trait FallbackConfigReset: AnyConfig + Sync {
12    /// Removes the `key` from the config and updates all active config variables back to
13    /// the fallback value. Note that if you assign the config variable the key will be re-inserted on the config.
14    fn reset(&self, key: &ConfigKey);
15
16    /// Returns a read-only var that is `true` when the `key` has an entry in the read-write config.
17    fn can_reset(&self, key: ConfigKey) -> BoxedVar<bool>;
18
19    /// Clone a reference to the config.
20    fn clone_boxed(&self) -> Box<dyn FallbackConfigReset>;
21}
22impl Clone for Box<dyn FallbackConfigReset> {
23    fn clone(&self) -> Self {
24        self.clone_boxed()
25    }
26}
27
28/// Represents a copy-on-write config source that wraps two other sources, a read-write config and a read-only fallback config.
29///
30/// The config variables are connected to both sources, if the read-write config is not set the var will update with the
31/// fallback config, if it is set it will sync with the read-write config.
32///
33/// The `FallbackConfig` type is an `Arc` internally, so you can keep a cloned reference to it after moving it into
34/// [`CONFIG`] or another combinator config.
35pub struct FallbackConfig<S: Config, F: Config>(Arc<Mutex<FallbackConfigData<S, F>>>);
36impl<S: Config, F: Config> FallbackConfig<S, F> {
37    /// New from the read-write config and read-only fallback.
38    pub fn new(config: S, fallback: F) -> Self {
39        Self(Arc::new(Mutex::new(FallbackConfigData {
40            fallback,
41            config,
42            vars: HashMap::new(),
43        })))
44    }
45
46    /// Removes the `key` from the config and updates all active config variables back to
47    /// the fallback value. Note that if you assign the config variable the key will be re-inserted on the config.
48    pub fn reset(&self, key: &ConfigKey) {
49        FallbackConfigData::reset(&self.0, key);
50    }
51
52    /// Returns a read-only var that is `true` when the `key` has an entry in the read-write config.
53    pub fn can_reset(&self, key: ConfigKey) -> BoxedVar<bool> {
54        self.0.lock().config.contains_key(key)
55    }
56}
57impl<S: Config, F: Config> Clone for FallbackConfig<S, F> {
58    fn clone(&self) -> Self {
59        FallbackConfig(Arc::clone(&self.0))
60    }
61}
62impl<S: Config, F: Config> FallbackConfigReset for FallbackConfig<S, F> {
63    fn reset(&self, key: &ConfigKey) {
64        self.reset(key)
65    }
66
67    fn can_reset(&self, key: ConfigKey) -> BoxedVar<bool> {
68        self.can_reset(key)
69    }
70
71    fn clone_boxed(&self) -> Box<dyn FallbackConfigReset> {
72        Box::new(self.clone())
73    }
74}
75impl<S: Config, F: Config> AnyConfig for FallbackConfig<S, F> {
76    fn status(&self) -> BoxedVar<ConfigStatus> {
77        let d = self.0.lock();
78        merge_var!(d.fallback.status(), d.config.status(), |fallback, over| {
79            ConfigStatus::merge_status([fallback.clone(), over.clone()].into_iter())
80        })
81        .boxed()
82    }
83
84    fn get_raw(&mut self, key: ConfigKey, default: RawConfigValue, insert: bool, shared: bool) -> BoxedVar<RawConfigValue> {
85        let mut d = self.0.lock();
86        let d = &mut *d;
87
88        if d.vars.len() > 1000 {
89            d.vars.retain(|_, v| v.retain());
90        }
91
92        let entry = d.vars.entry(key.clone()).or_default();
93        if let Some(res) = entry.res.upgrade() {
94            return res.boxed();
95        }
96
97        let cfg_contains_key_var = d.config.contains_key(key.clone());
98        let is_already_set = cfg_contains_key_var.get();
99
100        let cfg_var = d.config.get_raw(key.clone(), default.clone(), insert, shared);
101
102        let fall_var = d.fallback.get_raw(key, default, false, shared);
103
104        let res_var = var(if is_already_set { cfg_var.get() } else { fall_var.get() });
105        entry.res = res_var.downgrade();
106
107        let binding_tag = BindMapBidiTag::new_unique();
108
109        #[derive(Clone, Copy, Debug, PartialEq)]
110        struct ResetTag;
111
112        // fallback->res binding can re-enable on reset.
113        let fall_res_enabled = Arc::new(AtomicBool::new(!is_already_set));
114
115        // bind cfg_var -> res_var, handles potential bidi binding
116        let weak_res_var = res_var.downgrade();
117        cfg_var
118            .hook(clmv!(fall_res_enabled, |args| {
119                if let Some(res_var) = weak_res_var.upgrade() {
120                    let is_from_other = args.downcast_tags::<BindMapBidiTag>().any(|&b| b == binding_tag);
121                    if !is_from_other {
122                        // res_var did not cause this assign, propagate.
123
124                        // disable fallback->res binding
125                        fall_res_enabled.store(false, Ordering::Relaxed);
126
127                        let value = args.value().clone();
128
129                        res_var.modify(move |v| {
130                            if v.as_ref() != &value {
131                                v.set(value);
132                                v.push_tag(binding_tag);
133                            }
134                        });
135                    }
136
137                    true
138                } else {
139                    false
140                }
141            }))
142            .perm();
143
144        // bind fallback_var -> res_var.
145        let weak_res_var = res_var.downgrade();
146        fall_var
147            .hook(clmv!(fall_res_enabled, |args| {
148                if let Some(res_var) = weak_res_var.upgrade() {
149                    if fall_res_enabled.load(Ordering::Relaxed) {
150                        let value = args.value().clone();
151                        res_var.modify(move |v| {
152                            if v.as_ref() != &value {
153                                v.set(value);
154                                // don't set cfg_var from fallback update.
155                                v.push_tag(binding_tag);
156                            }
157                        });
158                    }
159
160                    true
161                } else {
162                    false
163                }
164            }))
165            .perm();
166
167        // bind cfg_contains_key_var to restore sync with fallback_var when cannot sync with cfg_var anymore.
168        let weak_fall_var = fall_var.downgrade();
169        let weak_res_var = res_var.downgrade();
170        cfg_contains_key_var
171            .hook(clmv!(fall_res_enabled, |args| {
172                if let Some(res_var) = weak_res_var.upgrade() {
173                    // still alive
174                    let can_reset = *args.value();
175                    if !can_reset && !fall_res_enabled.load(Ordering::Relaxed) {
176                        // cfg_var removed and we are sync with it.
177                        if let Some(fall_var) = weak_fall_var.upgrade() {
178                            // still alive, sync with fallback_var.
179                            let fall_value = fall_var.get();
180                            res_var.modify(move |vm| {
181                                vm.set(fall_value);
182                                vm.push_tag(ResetTag); // res_var will reset
183                            });
184                        } else {
185                            return false;
186                        }
187                    }
188                    true
189                } else {
190                    false
191                }
192            }))
193            .perm();
194
195        // map res_var -> cfg_var, manages fallback binding.
196        res_var
197            .hook(move |args| {
198                let _strong_ref = (&fall_var, &cfg_contains_key_var);
199
200                let is_from_other = args.downcast_tags::<BindMapBidiTag>().any(|&b| b == binding_tag);
201                if !is_from_other {
202                    // not set from cfg/fallback
203
204                    let is_reset = args.downcast_tags::<ResetTag>().next().is_some();
205                    if is_reset {
206                        fall_res_enabled.store(true, Ordering::Relaxed);
207                    } else {
208                        let after_reset = fall_res_enabled.swap(false, Ordering::Relaxed);
209                        let value = args.value().clone();
210                        let _ = cfg_var.modify(move |v| {
211                            if v.as_ref() != &value {
212                                v.set(value);
213                                v.push_tag(binding_tag);
214                            } else if after_reset {
215                                // cfg still has value from before reset, cause it to write
216                                v.update();
217                            }
218                        });
219                    }
220                }
221
222                true
223            })
224            .perm();
225
226        res_var.boxed()
227    }
228
229    fn contains_key(&mut self, key: ConfigKey) -> BoxedVar<bool> {
230        let mut d = self.0.lock();
231        merge_var!(d.fallback.contains_key(key.clone()), d.config.contains_key(key), |&a, &b| a || b).boxed()
232    }
233
234    fn remove(&mut self, key: &ConfigKey) -> bool {
235        let mut d = self.0.lock();
236        d.fallback.remove(key) || d.config.remove(key)
237    }
238
239    fn low_memory(&mut self) {
240        self.0.lock().vars.retain(|_, v| v.retain())
241    }
242}
243impl<S: Config, F: Config> Config for FallbackConfig<S, F> {
244    fn get<T: ConfigValue>(&mut self, key: impl Into<ConfigKey>, default: T, insert: bool) -> BoxedVar<T> {
245        self.get_raw(key.into(), RawConfigValue::serialize(&default).unwrap(), insert, true)
246            .filter_map_bidi(
247                |raw| raw.clone().deserialize().ok(),
248                |v| RawConfigValue::serialize(v).ok(),
249                move || default.clone(),
250            )
251            .boxed()
252    }
253}
254
255#[derive(Default)]
256struct VarEntry {
257    res: WeakArcVar<RawConfigValue>,
258}
259impl VarEntry {
260    fn retain(&self) -> bool {
261        self.res.strong_count() > 0
262    }
263}
264
265struct FallbackConfigData<S: Config, F: Config> {
266    fallback: F,
267    config: S,
268
269    vars: HashMap<ConfigKey, VarEntry>,
270}
271impl<S: Config, F: Config> FallbackConfigData<S, F> {
272    fn reset(c: &Arc<Mutex<Self>>, key: &ConfigKey) {
273        let mut d = c.lock();
274        let d = &mut *d;
275
276        d.vars.retain(|_, v| v.retain());
277
278        // Just remove, we already bind with `config.contains_key` and will
279        // reset when it changes to `false`.
280        d.config.remove(key);
281    }
282}
283
284unique_id_32! {
285    /// Used to stop an extra "map_back" caused by "map" itself
286    #[derive(Debug)]
287    struct BindMapBidiTag;
288}
289zng_unique_id::impl_unique_id_bytemuck!(BindMapBidiTag);