zng_ext_config/
sync.rs
1use std::{path::PathBuf, sync::atomic::AtomicBool};
2
3use atomic::{Atomic, Ordering};
4use zng_clone_move::clmv;
5use zng_ext_fs_watcher::WATCHER;
6use zng_var::{ReadOnlyArcVar, VARS, VarUpdateId};
7
8use super::*;
9
10pub struct SyncConfig<M: ConfigMap> {
17 sync_var: ArcVar<M>,
18 status: ReadOnlyArcVar<ConfigStatus>,
19 shared: ConfigVars,
20}
21impl<M: ConfigMap> SyncConfig<M> {
22 pub fn sync(file: impl Into<PathBuf>) -> Self {
24 let (sync_var, status) = WATCHER.sync_status::<_, _, ConfigStatusError, ConfigStatusError>(
25 file,
26 M::empty(),
27 |r| match (|| M::read(r?))() {
28 Ok(ok) => Ok(Some(ok)),
29 Err(e) => {
30 tracing::error!("sync config read error, {e:?}");
31 Err(vec![Arc::new(e)])
32 }
33 },
34 |map, w| {
35 let r = (|| {
36 let mut w = w?;
37 map.write(&mut w)?;
38 w.commit()
39 })();
40 match r {
41 Ok(()) => Ok(()),
42 Err(e) => {
43 tracing::error!("sync config write error, {e:?}");
44 Err(vec![Arc::new(e)])
45 }
46 }
47 },
48 );
49
50 Self {
51 sync_var,
52 status,
53 shared: ConfigVars::default(),
54 }
55 }
56
57 fn get_new_raw(sync_var: &ArcVar<M>, key: ConfigKey, default: RawConfigValue, insert: bool) -> BoxedVar<RawConfigValue> {
58 let mut used_default = false;
60 let var = match sync_var.with(|m| ConfigMap::get_raw(m, &key)) {
61 Ok(raw) => {
62 match raw {
64 Some(raw) => var(raw),
65 None => {
66 used_default = true;
67 var(default)
68 }
69 }
70 }
71 Err(e) => {
72 tracing::error!("sync config get({key:?}) error, {e:?}");
74 used_default = true;
75 var(default)
76 }
77 };
78
79 if insert && used_default {
80 var.update();
81 }
82
83 let wk_var = var.downgrade();
87 let last_update = Atomic::new(VarUpdateId::never());
88 let request_update = AtomicBool::new(used_default);
89 sync_var
90 .hook(clmv!(key, |map| {
91 let update_id = VARS.update_id();
92 if update_id == last_update.load(Ordering::Relaxed) {
93 return true;
94 }
95 last_update.store(update_id, Ordering::Relaxed);
96 if let Some(var) = wk_var.upgrade() {
97 match map.value().get_raw(&key) {
98 Ok(raw) => {
99 if let Some(raw) = raw {
101 var.set(raw);
102 if request_update.swap(false, Ordering::Relaxed) {
103 var.update();
106 }
107 } else {
108 request_update.store(true, Ordering::Relaxed);
110 }
111 }
112 Err(e) => {
113 tracing::error!("sync config get({key:?}) error, {e:?}");
115 }
116 }
117 true
119 } else {
120 false
122 }
123 }))
124 .perm();
125
126 let wk_sync_var = sync_var.downgrade();
128 let last_update = Atomic::new(VarUpdateId::never());
129 var.hook(clmv!(|value| {
130 let update_id = VARS.update_id();
131 if update_id == last_update.load(Ordering::Relaxed) {
132 return true;
133 }
134 last_update.store(update_id, Ordering::Relaxed);
135 if let Some(sync_var) = wk_sync_var.upgrade() {
136 let raw = value.value().clone();
137 sync_var.modify(clmv!(key, |m| {
138 match ConfigMap::set_raw(m, key.clone(), raw) {
140 Ok(()) => {
141 }
143 Err(e) => {
144 tracing::error!("sync config set({key:?}) error, {e:?}");
146 }
147 }
148 }));
149
150 true
152 } else {
153 false
155 }
156 }))
157 .perm();
158
159 var.boxed()
160 }
161
162 fn get_new<T: ConfigValue>(sync_var: &ArcVar<M>, key: impl Into<ConfigKey>, default: T, insert: bool) -> BoxedVar<T> {
163 let key = key.into();
165 let mut used_default = false;
166 let var = match sync_var.with(|m| ConfigMap::get::<T>(m, &key)) {
167 Ok(value) => match value {
168 Some(val) => var(val),
169 None => {
170 used_default = true;
171 var(default)
172 }
173 },
174 Err(e) => {
175 tracing::error!("sync config get({key:?}) error, {e:?}");
176 used_default = true;
177 var(default)
178 }
179 };
180
181 if insert && used_default {
182 var.update();
183 }
184
185 let wk_var = var.downgrade();
189 sync_var
190 .hook(clmv!(key, |map| {
191 if let Some(var) = wk_var.upgrade() {
192 match map.value().get::<T>(&key) {
193 Ok(value) => {
194 if let Some(value) = value {
195 var.set(value);
196 }
197 }
198 Err(e) => {
199 tracing::error!("sync config get({key:?}) error, {e:?}");
200 }
201 }
202 true
203 } else {
204 false
205 }
206 }))
207 .perm();
208
209 let wk_sync_var = sync_var.downgrade();
211 var.hook(clmv!(|value| {
212 if let Some(sync_var) = wk_sync_var.upgrade() {
213 let value = value.value().clone();
214 sync_var.modify(clmv!(key, |m| {
215 match ConfigMap::set(m, key.clone(), value) {
216 Ok(()) => {}
217 Err(e) => {
218 tracing::error!("sync config set({key:?}) error, {e:?}");
219 }
220 }
221 }));
222 true
223 } else {
224 false
225 }
226 }))
227 .perm();
228
229 var.boxed()
230 }
231}
232impl<M: ConfigMap> AnyConfig for SyncConfig<M> {
233 fn get_raw(&mut self, key: ConfigKey, default: RawConfigValue, insert: bool, shared: bool) -> BoxedVar<RawConfigValue> {
234 if shared {
235 self.shared
236 .get_or_bind(key, |key| Self::get_new_raw(&self.sync_var, key.clone(), default, insert))
237 } else {
238 Self::get_new_raw(&self.sync_var, key, default, insert)
239 }
240 }
241
242 fn contains_key(&mut self, key: ConfigKey) -> BoxedVar<bool> {
243 self.sync_var.map(move |q| q.contains_key(&key)).boxed()
244 }
245
246 fn status(&self) -> BoxedVar<ConfigStatus> {
247 self.status.clone().boxed()
248 }
249
250 fn remove(&mut self, key: &ConfigKey) -> bool {
251 let contains = self.sync_var.with(|q| q.contains_key(key));
252 if contains {
253 self.sync_var.modify(clmv!(key, |m| {
254 ConfigMap::remove(m, &key);
255 }));
256 }
257 contains
258 }
259
260 fn low_memory(&mut self) {
261 self.shared.low_memory();
262 }
263}
264impl<M: ConfigMap> Config for SyncConfig<M> {
265 fn get<T: ConfigValue>(&mut self, key: impl Into<ConfigKey>, default: T, insert: bool) -> BoxedVar<T> {
266 self.shared
267 .get_or_bind(key.into(), |key| Self::get_new(&self.sync_var, key.clone(), default, insert))
268 }
269}