zng_ext_config/
fallback.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
use std::sync::atomic::{AtomicBool, Ordering};

use crate::task::parking_lot::Mutex;
use zng_clone_move::clmv;
use zng_unique_id::unique_id_32;
use zng_var::merge_var;

use super::*;

/// Reset controls of a [`FallbackConfig`].
pub trait FallbackConfigReset: AnyConfig + Sync {
    /// Removes the `key` from the config and updates all active config variables back to
    /// the fallback value. Note that if you assign the config variable the key will be re-inserted on the config.
    fn reset(&self, key: &ConfigKey);

    /// Returns a read-only var that is `true` when the `key` has an entry in the read-write config.
    fn can_reset(&self, key: ConfigKey) -> BoxedVar<bool>;

    /// Clone a reference to the config.
    fn clone_boxed(&self) -> Box<dyn FallbackConfigReset>;
}
impl Clone for Box<dyn FallbackConfigReset> {
    fn clone(&self) -> Self {
        self.clone_boxed()
    }
}

/// Represents a copy-on-write config source that wraps two other sources, a read-write config and a read-only fallback config.
///
/// The config variables are connected to both sources, if the read-write config is not set the var will update with the
/// fallback config, if it is set it will sync with the read-write config.
///
/// The `FallbackConfig` type is an `Arc` internally, so you can keep a cloned reference to it after moving it into
/// [`CONFIG`] or another combinator config.
pub struct FallbackConfig<S: Config, F: Config>(Arc<Mutex<FallbackConfigData<S, F>>>);
impl<S: Config, F: Config> FallbackConfig<S, F> {
    /// New from the read-write config and read-only fallback.
    pub fn new(config: S, fallback: F) -> Self {
        Self(Arc::new(Mutex::new(FallbackConfigData {
            fallback,
            config,
            vars: HashMap::new(),
        })))
    }

    /// Removes the `key` from the config and updates all active config variables back to
    /// the fallback value. Note that if you assign the config variable the key will be re-inserted on the config.
    pub fn reset(&self, key: &ConfigKey) {
        FallbackConfigData::reset(&self.0, key);
    }

    /// Returns a read-only var that is `true` when the `key` has an entry in the read-write config.
    pub fn can_reset(&self, key: ConfigKey) -> BoxedVar<bool> {
        self.0.lock().config.contains_key(key)
    }
}
impl<S: Config, F: Config> Clone for FallbackConfig<S, F> {
    fn clone(&self) -> Self {
        FallbackConfig(Arc::clone(&self.0))
    }
}
impl<S: Config, F: Config> FallbackConfigReset for FallbackConfig<S, F> {
    fn reset(&self, key: &ConfigKey) {
        self.reset(key)
    }

    fn can_reset(&self, key: ConfigKey) -> BoxedVar<bool> {
        self.can_reset(key)
    }

    fn clone_boxed(&self) -> Box<dyn FallbackConfigReset> {
        Box::new(self.clone())
    }
}
impl<S: Config, F: Config> AnyConfig for FallbackConfig<S, F> {
    fn status(&self) -> BoxedVar<ConfigStatus> {
        let d = self.0.lock();
        merge_var!(d.fallback.status(), d.config.status(), |fallback, over| {
            ConfigStatus::merge_status([fallback.clone(), over.clone()].into_iter())
        })
        .boxed()
    }

    fn get_raw(&mut self, key: ConfigKey, default: RawConfigValue, insert: bool, shared: bool) -> BoxedVar<RawConfigValue> {
        let mut d = self.0.lock();
        let d = &mut *d;

        if d.vars.len() > 1000 {
            d.vars.retain(|_, v| v.retain());
        }

        let entry = d.vars.entry(key.clone()).or_default();
        if let Some(res) = entry.res.upgrade() {
            return res.boxed();
        }

        let cfg_contains_key_var = d.config.contains_key(key.clone());
        let is_already_set = cfg_contains_key_var.get();

        let cfg_var = d.config.get_raw(key.clone(), default.clone(), insert, shared);

        let fall_var = d.fallback.get_raw(key, default, false, shared);

        let res_var = var(if is_already_set { cfg_var.get() } else { fall_var.get() });
        entry.res = res_var.downgrade();

        let binding_tag = BindMapBidiTag::new_unique();

        #[derive(Clone, Copy, Debug, PartialEq)]
        struct ResetTag;

        // fallback->res binding can re-enable on reset.
        let fall_res_enabled = Arc::new(AtomicBool::new(!is_already_set));

        // bind cfg_var -> res_var, handles potential bidi binding
        let weak_res_var = res_var.downgrade();
        cfg_var
            .hook(clmv!(fall_res_enabled, |args| {
                if let Some(res_var) = weak_res_var.upgrade() {
                    let is_from_other = args.downcast_tags::<BindMapBidiTag>().any(|&b| b == binding_tag);
                    if !is_from_other {
                        // res_var did not cause this assign, propagate.

                        // disable fallback->res binding
                        fall_res_enabled.store(false, Ordering::Relaxed);

                        let value = args.value().clone();

                        res_var.modify(move |v| {
                            if v.as_ref() != &value {
                                v.set(value);
                                v.push_tag(binding_tag);
                            }
                        });
                    }

                    true
                } else {
                    false
                }
            }))
            .perm();

        // bind fallback_var -> res_var.
        let weak_res_var = res_var.downgrade();
        fall_var
            .hook(clmv!(fall_res_enabled, |args| {
                if let Some(res_var) = weak_res_var.upgrade() {
                    if fall_res_enabled.load(Ordering::Relaxed) {
                        let value = args.value().clone();
                        res_var.modify(move |v| {
                            if v.as_ref() != &value {
                                v.set(value);
                                // don't set cfg_var from fallback update.
                                v.push_tag(binding_tag);
                            }
                        });
                    }

                    true
                } else {
                    false
                }
            }))
            .perm();

        // bind cfg_contains_key_var to restore sync with fallback_var when cannot sync with cfg_var anymore.
        let weak_fall_var = fall_var.downgrade();
        let weak_res_var = res_var.downgrade();
        cfg_contains_key_var
            .hook(clmv!(fall_res_enabled, |args| {
                if let Some(res_var) = weak_res_var.upgrade() {
                    // still alive
                    let can_reset = *args.value();
                    if !can_reset && !fall_res_enabled.load(Ordering::Relaxed) {
                        // cfg_var removed and we are sync with it.
                        if let Some(fall_var) = weak_fall_var.upgrade() {
                            // still alive, sync with fallback_var.
                            let fall_value = fall_var.get();
                            res_var.modify(move |vm| {
                                vm.set(fall_value);
                                vm.push_tag(ResetTag); // res_var will reset
                            });
                        } else {
                            return false;
                        }
                    }
                    true
                } else {
                    false
                }
            }))
            .perm();

        // map res_var -> cfg_var, manages fallback binding.
        res_var
            .hook(move |args| {
                let _strong_ref = (&fall_var, &cfg_contains_key_var);

                let is_from_other = args.downcast_tags::<BindMapBidiTag>().any(|&b| b == binding_tag);
                if !is_from_other {
                    // not set from cfg/fallback

                    let is_reset = args.downcast_tags::<ResetTag>().next().is_some();
                    if is_reset {
                        fall_res_enabled.store(true, Ordering::Relaxed);
                    } else {
                        let after_reset = fall_res_enabled.swap(false, Ordering::Relaxed);
                        let value = args.value().clone();
                        let _ = cfg_var.modify(move |v| {
                            if v.as_ref() != &value {
                                v.set(value);
                                v.push_tag(binding_tag);
                            } else if after_reset {
                                // cfg still has value from before reset, cause it to write
                                v.update();
                            }
                        });
                    }
                }

                true
            })
            .perm();

        res_var.boxed()
    }

    fn contains_key(&mut self, key: ConfigKey) -> BoxedVar<bool> {
        let mut d = self.0.lock();
        merge_var!(d.fallback.contains_key(key.clone()), d.config.contains_key(key), |&a, &b| a || b).boxed()
    }

    fn remove(&mut self, key: &ConfigKey) -> bool {
        let mut d = self.0.lock();
        d.fallback.remove(key) || d.config.remove(key)
    }

    fn low_memory(&mut self) {
        self.0.lock().vars.retain(|_, v| v.retain())
    }
}
impl<S: Config, F: Config> Config for FallbackConfig<S, F> {
    fn get<T: ConfigValue>(&mut self, key: impl Into<ConfigKey>, default: T, insert: bool) -> BoxedVar<T> {
        self.get_raw(key.into(), RawConfigValue::serialize(&default).unwrap(), insert, true)
            .filter_map_bidi(
                |raw| raw.clone().deserialize().ok(),
                |v| RawConfigValue::serialize(v).ok(),
                move || default.clone(),
            )
            .boxed()
    }
}

#[derive(Default)]
struct VarEntry {
    res: WeakArcVar<RawConfigValue>,
}
impl VarEntry {
    fn retain(&self) -> bool {
        self.res.strong_count() > 0
    }
}

struct FallbackConfigData<S: Config, F: Config> {
    fallback: F,
    config: S,

    vars: HashMap<ConfigKey, VarEntry>,
}
impl<S: Config, F: Config> FallbackConfigData<S, F> {
    fn reset(c: &Arc<Mutex<Self>>, key: &ConfigKey) {
        let mut d = c.lock();
        let d = &mut *d;

        d.vars.retain(|_, v| v.retain());

        // Just remove, we already bind with `config.contains_key` and will
        // reset when it changes to `false`.
        d.config.remove(key);
    }
}

unique_id_32! {
    /// Used to stop an extra "map_back" caused by "map" itself
    #[derive(Debug)]
    struct BindMapBidiTag;
}
zng_unique_id::impl_unique_id_bytemuck!(BindMapBidiTag);