1use std::{
2 collections::HashMap,
3 fmt, mem,
4 path::PathBuf,
5 sync::Arc,
6 task::Waker,
7 time::{Duration, Instant},
8};
9
10use crate::Deadline;
11use parking_lot::Mutex;
12use zng_app_context::{AppScope, app_local};
13use zng_task::DEADLINE_APP;
14use zng_time::{INSTANT_APP, InstantMode};
15use zng_txt::Txt;
16use zng_var::{ArcVar, ReadOnlyArcVar, ResponderVar, ResponseVar, VARS, VARS_APP, Var as _, response_var};
17
18use crate::{
19 APP, AppControlFlow, AppEventObserver, AppExtension, AppExtensionsInfo, DInstant, INSTANT,
20 event::{
21 AnyEventArgs, AppDisconnected, CommandHandle, CommandInfoExt, CommandNameExt, EVENTS, EventPropagationHandle,
22 TimeoutOrAppDisconnected, command, event,
23 },
24 event_args,
25 shortcut::CommandShortcutExt,
26 shortcut::shortcut,
27 timer::TimersService,
28 update::{
29 ContextUpdates, EventUpdate, InfoUpdates, LayoutUpdates, RenderUpdates, UPDATES, UpdateOp, UpdateTrace, UpdatesTrace, WidgetUpdates,
30 },
31 view_process::{raw_device_events::DeviceId, *},
32 widget::WidgetId,
33 window::WindowId,
34};
35
36pub(crate) struct RunningApp<E: AppExtension> {
38 extensions: (AppIntrinsic, E),
39
40 receiver: flume::Receiver<AppEvent>,
41
42 loop_timer: LoopTimer,
43 loop_monitor: LoopMonitor,
44
45 pending_view_events: Vec<zng_view_api::Event>,
46 pending_view_frame_events: Vec<zng_view_api::window::EventFrameRendered>,
47 pending: ContextUpdates,
48
49 exited: bool,
50
51 _scope: AppScope,
53}
54impl<E: AppExtension> RunningApp<E> {
55 pub(crate) fn start(
56 scope: AppScope,
57 mut extensions: E,
58 is_headed: bool,
59 with_renderer: bool,
60 view_process_exe: Option<PathBuf>,
61 view_process_env: HashMap<Txt, Txt>,
62 ) -> Self {
63 let _s = tracing::debug_span!("APP::start").entered();
64
65 let (sender, receiver) = AppEventSender::new();
66
67 UPDATES.init(sender);
68
69 fn app_waker() {
70 UPDATES.update(None);
71 }
72 VARS_APP.init_app_waker(app_waker);
73 VARS_APP.init_modify_trace(UpdatesTrace::log_var);
74 DEADLINE_APP.init_deadline_service(crate::timer::deadline_service);
75 zng_var::types::TRANSITIONABLE_APP.init_rgba_lerp(zng_color::lerp_rgba);
76
77 let mut info = AppExtensionsInfo::start();
78 {
79 let _t = INSTANT_APP.pause_for_update();
80 extensions.register(&mut info);
81 }
82 let device_events = extensions.enable_device_events();
83
84 {
85 let mut sv = APP_PROCESS_SV.write();
86 sv.set_extensions(info, device_events);
87 }
88
89 if with_renderer && view_process_exe.is_none() {
90 zng_env::assert_inited();
91 }
92
93 #[cfg(not(target_arch = "wasm32"))]
94 let view_process_exe = view_process_exe.unwrap_or_else(|| std::env::current_exe().expect("current_exe"));
95 #[cfg(target_arch = "wasm32")]
96 let view_process_exe = std::path::PathBuf::from("<wasm>");
97
98 let process = AppIntrinsic::pre_init(is_headed, with_renderer, view_process_exe, view_process_env, device_events);
99
100 {
101 let _s = tracing::debug_span!("extensions.init").entered();
102 extensions.init();
103 }
104
105 let args = AppStartArgs { _private: () };
106 for h in zng_unique_id::hot_static_ref!(ON_APP_START).lock().iter_mut() {
107 h(&args)
108 }
109
110 RunningApp {
111 extensions: (process, extensions),
112
113 receiver,
114
115 loop_timer: LoopTimer::default(),
116 loop_monitor: LoopMonitor::default(),
117
118 pending_view_events: Vec::with_capacity(100),
119 pending_view_frame_events: Vec::with_capacity(5),
120 pending: ContextUpdates {
121 events: Vec::with_capacity(100),
122 update: false,
123 info: false,
124 layout: false,
125 render: false,
126 update_widgets: WidgetUpdates::default(),
127 info_widgets: InfoUpdates::default(),
128 layout_widgets: LayoutUpdates::default(),
129 render_widgets: RenderUpdates::default(),
130 render_update_widgets: RenderUpdates::default(),
131 },
132 exited: false,
133
134 _scope: scope,
135 }
136 }
137
138 pub fn has_exited(&self) -> bool {
139 self.exited
140 }
141
142 pub fn notify_event<O: AppEventObserver>(&mut self, mut update: EventUpdate, observer: &mut O) {
144 let _scope = tracing::trace_span!("notify_event", event = update.event().name()).entered();
145
146 let _t = INSTANT_APP.pause_for_update();
147
148 update.event().on_update(&mut update);
149
150 self.extensions.event_preview(&mut update);
151 observer.event_preview(&mut update);
152 update.call_pre_actions();
153
154 self.extensions.event_ui(&mut update);
155 observer.event_ui(&mut update);
156
157 self.extensions.event(&mut update);
158 observer.event(&mut update);
159 update.call_pos_actions();
160 }
161
162 fn device_id(&mut self, id: zng_view_api::DeviceId) -> DeviceId {
163 VIEW_PROCESS.device_id(id)
164 }
165
166 fn on_view_event<O: AppEventObserver>(&mut self, ev: zng_view_api::Event, observer: &mut O) {
168 use crate::view_process::raw_device_events::*;
169 use crate::view_process::raw_events::*;
170 use zng_view_api::Event;
171
172 fn window_id(id: zng_view_api::window::WindowId) -> WindowId {
173 WindowId::from_raw(id.get())
174 }
175
176 match ev {
177 Event::MouseMoved {
178 window: w_id,
179 device: d_id,
180 coalesced_pos,
181 position,
182 } => {
183 let args = RawMouseMovedArgs::now(window_id(w_id), self.device_id(d_id), coalesced_pos, position);
184 self.notify_event(RAW_MOUSE_MOVED_EVENT.new_update(args), observer);
185 }
186 Event::MouseEntered {
187 window: w_id,
188 device: d_id,
189 } => {
190 let args = RawMouseArgs::now(window_id(w_id), self.device_id(d_id));
191 self.notify_event(RAW_MOUSE_ENTERED_EVENT.new_update(args), observer);
192 }
193 Event::MouseLeft {
194 window: w_id,
195 device: d_id,
196 } => {
197 let args = RawMouseArgs::now(window_id(w_id), self.device_id(d_id));
198 self.notify_event(RAW_MOUSE_LEFT_EVENT.new_update(args), observer);
199 }
200 Event::WindowChanged(c) => {
201 let monitor_id = c.monitor.map(|id| VIEW_PROCESS.monitor_id(id));
202 let args = RawWindowChangedArgs::now(
203 window_id(c.window),
204 c.state,
205 c.position,
206 monitor_id,
207 c.size,
208 c.safe_padding,
209 c.cause,
210 c.frame_wait_id,
211 );
212 self.notify_event(RAW_WINDOW_CHANGED_EVENT.new_update(args), observer);
213 }
214 Event::DragHovered { window, data, allowed } => {
215 let args = RawDragHoveredArgs::now(window_id(window), data, allowed);
216 self.notify_event(RAW_DRAG_HOVERED_EVENT.new_update(args), observer);
217 }
218 Event::DragMoved {
219 window,
220 coalesced_pos,
221 position,
222 } => {
223 let args = RawDragMovedArgs::now(window_id(window), coalesced_pos, position);
224 self.notify_event(RAW_DRAG_MOVED_EVENT.new_update(args), observer);
225 }
226 Event::DragDropped {
227 window,
228 data,
229 allowed,
230 drop_id,
231 } => {
232 let args = RawDragDroppedArgs::now(window_id(window), data, allowed, drop_id);
233 self.notify_event(RAW_DRAG_DROPPED_EVENT.new_update(args), observer);
234 }
235 Event::DragCancelled { window } => {
236 let args = RawDragCancelledArgs::now(window_id(window));
237 self.notify_event(RAW_DRAG_CANCELLED_EVENT.new_update(args), observer);
238 }
239 Event::AppDragEnded { window, drag, applied } => {
240 let args = RawAppDragEndedArgs::now(window_id(window), drag, applied);
241 self.notify_event(RAW_APP_DRAG_ENDED_EVENT.new_update(args), observer);
242 }
243 Event::FocusChanged { prev, new } => {
244 let args = RawWindowFocusArgs::now(prev.map(window_id), new.map(window_id));
245 self.notify_event(RAW_WINDOW_FOCUS_EVENT.new_update(args), observer);
246 }
247 Event::KeyboardInput {
248 window: w_id,
249 device: d_id,
250 key_code,
251 state,
252 key,
253 key_location,
254 key_modified,
255 text,
256 } => {
257 let args = RawKeyInputArgs::now(
258 window_id(w_id),
259 self.device_id(d_id),
260 key_code,
261 key_location,
262 state,
263 key,
264 key_modified,
265 text,
266 );
267 self.notify_event(RAW_KEY_INPUT_EVENT.new_update(args), observer);
268 }
269 Event::Ime { window: w_id, ime } => {
270 let args = RawImeArgs::now(window_id(w_id), ime);
271 self.notify_event(RAW_IME_EVENT.new_update(args), observer);
272 }
273
274 Event::MouseWheel {
275 window: w_id,
276 device: d_id,
277 delta,
278 phase,
279 } => {
280 let args = RawMouseWheelArgs::now(window_id(w_id), self.device_id(d_id), delta, phase);
281 self.notify_event(RAW_MOUSE_WHEEL_EVENT.new_update(args), observer);
282 }
283 Event::MouseInput {
284 window: w_id,
285 device: d_id,
286 state,
287 button,
288 } => {
289 let args = RawMouseInputArgs::now(window_id(w_id), self.device_id(d_id), state, button);
290 self.notify_event(RAW_MOUSE_INPUT_EVENT.new_update(args), observer);
291 }
292 Event::TouchpadPressure {
293 window: w_id,
294 device: d_id,
295 pressure,
296 stage,
297 } => {
298 let args = RawTouchpadPressureArgs::now(window_id(w_id), self.device_id(d_id), pressure, stage);
299 self.notify_event(RAW_TOUCHPAD_PRESSURE_EVENT.new_update(args), observer);
300 }
301 Event::AxisMotion {
302 window: w_id,
303 device: d_id,
304 axis,
305 value,
306 } => {
307 let args = RawAxisMotionArgs::now(window_id(w_id), self.device_id(d_id), axis, value);
308 self.notify_event(RAW_AXIS_MOTION_EVENT.new_update(args), observer);
309 }
310 Event::Touch {
311 window: w_id,
312 device: d_id,
313 touches,
314 } => {
315 let args = RawTouchArgs::now(window_id(w_id), self.device_id(d_id), touches);
316 self.notify_event(RAW_TOUCH_EVENT.new_update(args), observer);
317 }
318 Event::ScaleFactorChanged {
319 monitor: id,
320 windows,
321 scale_factor,
322 } => {
323 let monitor_id = VIEW_PROCESS.monitor_id(id);
324 let windows: Vec<_> = windows.into_iter().map(window_id).collect();
325 let args = RawScaleFactorChangedArgs::now(monitor_id, windows, scale_factor);
326 self.notify_event(RAW_SCALE_FACTOR_CHANGED_EVENT.new_update(args), observer);
327 }
328 Event::MonitorsChanged(monitors) => {
329 let monitors: Vec<_> = monitors.into_iter().map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info)).collect();
330 let args = RawMonitorsChangedArgs::now(monitors);
331 self.notify_event(RAW_MONITORS_CHANGED_EVENT.new_update(args), observer);
332 }
333 Event::WindowCloseRequested(w_id) => {
334 let args = RawWindowCloseRequestedArgs::now(window_id(w_id));
335 self.notify_event(RAW_WINDOW_CLOSE_REQUESTED_EVENT.new_update(args), observer);
336 }
337 Event::WindowOpened(w_id, data) => {
338 let w_id = window_id(w_id);
339 let (window, data) = VIEW_PROCESS.on_window_opened(w_id, data);
340 let args = RawWindowOpenArgs::now(w_id, window, data);
341 self.notify_event(RAW_WINDOW_OPEN_EVENT.new_update(args), observer);
342 }
343 Event::HeadlessOpened(w_id, data) => {
344 let w_id = window_id(w_id);
345 let (surface, data) = VIEW_PROCESS.on_headless_opened(w_id, data);
346 let args = RawHeadlessOpenArgs::now(w_id, surface, data);
347 self.notify_event(RAW_HEADLESS_OPEN_EVENT.new_update(args), observer);
348 }
349 Event::WindowOrHeadlessOpenError { id: w_id, error } => {
350 let w_id = window_id(w_id);
351 let args = RawWindowOrHeadlessOpenErrorArgs::now(w_id, error);
352 self.notify_event(RAW_WINDOW_OR_HEADLESS_OPEN_ERROR_EVENT.new_update(args), observer);
353 }
354 Event::WindowClosed(w_id) => {
355 let args = RawWindowCloseArgs::now(window_id(w_id));
356 self.notify_event(RAW_WINDOW_CLOSE_EVENT.new_update(args), observer);
357 }
358 Event::ImageMetadataLoaded {
359 image: id,
360 size,
361 ppi,
362 is_mask,
363 } => {
364 if let Some(img) = VIEW_PROCESS.on_image_metadata_loaded(id, size, ppi, is_mask) {
365 let args = RawImageArgs::now(img);
366 self.notify_event(RAW_IMAGE_METADATA_LOADED_EVENT.new_update(args), observer);
367 }
368 }
369 Event::ImagePartiallyLoaded {
370 image: id,
371 partial_size,
372 ppi,
373 is_opaque,
374 is_mask,
375 partial_pixels: partial_bgra8,
376 } => {
377 if let Some(img) = VIEW_PROCESS.on_image_partially_loaded(id, partial_size, ppi, is_opaque, is_mask, partial_bgra8) {
378 let args = RawImageArgs::now(img);
379 self.notify_event(RAW_IMAGE_PARTIALLY_LOADED_EVENT.new_update(args), observer);
380 }
381 }
382 Event::ImageLoaded(image) => {
383 if let Some(img) = VIEW_PROCESS.on_image_loaded(image) {
384 let args = RawImageArgs::now(img);
385 self.notify_event(RAW_IMAGE_LOADED_EVENT.new_update(args), observer);
386 }
387 }
388 Event::ImageLoadError { image: id, error } => {
389 if let Some(img) = VIEW_PROCESS.on_image_error(id, error) {
390 let args = RawImageArgs::now(img);
391 self.notify_event(RAW_IMAGE_LOAD_ERROR_EVENT.new_update(args), observer);
392 }
393 }
394 Event::ImageEncoded { image: id, format, data } => VIEW_PROCESS.on_image_encoded(id, format, data),
395 Event::ImageEncodeError { image: id, format, error } => {
396 VIEW_PROCESS.on_image_encode_error(id, format, error);
397 }
398 Event::FrameImageReady {
399 window: w_id,
400 frame: frame_id,
401 image: image_id,
402 selection,
403 } => {
404 if let Some(img) = VIEW_PROCESS.on_frame_image_ready(image_id) {
405 let args = RawFrameImageReadyArgs::now(img, window_id(w_id), frame_id, selection);
406 self.notify_event(RAW_FRAME_IMAGE_READY_EVENT.new_update(args), observer);
407 }
408 }
409
410 Event::AccessInit { window: w_id } => {
411 self.notify_event(crate::access::on_access_init(window_id(w_id)), observer);
412 }
413 Event::AccessCommand {
414 window: win_id,
415 target: wgt_id,
416 command,
417 } => {
418 if let Some(update) = crate::access::on_access_command(window_id(win_id), WidgetId::from_raw(wgt_id.0), command) {
419 self.notify_event(update, observer);
420 }
421 }
422 Event::AccessDeinit { window: w_id } => {
423 self.notify_event(crate::access::on_access_deinit(window_id(w_id)), observer);
424 }
425
426 Event::MsgDialogResponse(id, response) => {
428 VIEW_PROCESS.on_message_dlg_response(id, response);
429 }
430 Event::FileDialogResponse(id, response) => {
431 VIEW_PROCESS.on_file_dlg_response(id, response);
432 }
433
434 Event::ExtensionEvent(id, payload) => {
436 let args = RawExtensionEventArgs::now(id, payload);
437 self.notify_event(RAW_EXTENSION_EVENT.new_update(args), observer);
438 }
439
440 Event::FontsChanged => {
442 let args = RawFontChangedArgs::now();
443 self.notify_event(RAW_FONT_CHANGED_EVENT.new_update(args), observer);
444 }
445 Event::FontAaChanged(aa) => {
446 let args = RawFontAaChangedArgs::now(aa);
447 self.notify_event(RAW_FONT_AA_CHANGED_EVENT.new_update(args), observer);
448 }
449 Event::MultiClickConfigChanged(cfg) => {
450 let args = RawMultiClickConfigChangedArgs::now(cfg);
451 self.notify_event(RAW_MULTI_CLICK_CONFIG_CHANGED_EVENT.new_update(args), observer);
452 }
453 Event::AnimationsConfigChanged(cfg) => {
454 VARS_APP.set_sys_animations_enabled(cfg.enabled);
455 let args = RawAnimationsConfigChangedArgs::now(cfg);
456 self.notify_event(RAW_ANIMATIONS_CONFIG_CHANGED_EVENT.new_update(args), observer);
457 }
458 Event::KeyRepeatConfigChanged(cfg) => {
459 let args = RawKeyRepeatConfigChangedArgs::now(cfg);
460 self.notify_event(RAW_KEY_REPEAT_CONFIG_CHANGED_EVENT.new_update(args), observer);
461 }
462 Event::TouchConfigChanged(cfg) => {
463 let args = RawTouchConfigChangedArgs::now(cfg);
464 self.notify_event(RAW_TOUCH_CONFIG_CHANGED_EVENT.new_update(args), observer);
465 }
466 Event::LocaleChanged(cfg) => {
467 let args = RawLocaleChangedArgs::now(cfg);
468 self.notify_event(RAW_LOCALE_CONFIG_CHANGED_EVENT.new_update(args), observer);
469 }
470 Event::ColorsConfigChanged(cfg) => {
471 let args = RawColorsConfigChangedArgs::now(cfg);
472 self.notify_event(RAW_COLORS_CONFIG_CHANGED_EVENT.new_update(args), observer);
473 }
474 Event::ChromeConfigChanged(cfg) => {
475 let args = RawChromeConfigChangedArgs::now(cfg);
476 self.notify_event(RAW_CHROME_CONFIG_CHANGED_EVENT.new_update(args), observer);
477 }
478
479 Event::DeviceAdded(d_id) => {
481 let args = DeviceArgs::now(self.device_id(d_id));
482 self.notify_event(DEVICE_ADDED_EVENT.new_update(args), observer);
483 }
484 Event::DeviceRemoved(d_id) => {
485 let args = DeviceArgs::now(self.device_id(d_id));
486 self.notify_event(DEVICE_REMOVED_EVENT.new_update(args), observer);
487 }
488 Event::DeviceMouseMotion { device: d_id, delta } => {
489 let args = MouseMotionArgs::now(self.device_id(d_id), delta);
490 self.notify_event(MOUSE_MOTION_EVENT.new_update(args), observer);
491 }
492 Event::DeviceMouseWheel { device: d_id, delta } => {
493 let args = MouseWheelArgs::now(self.device_id(d_id), delta);
494 self.notify_event(MOUSE_WHEEL_EVENT.new_update(args), observer);
495 }
496 Event::DeviceMotion { device: d_id, axis, value } => {
497 let args = MotionArgs::now(self.device_id(d_id), axis, value);
498 self.notify_event(MOTION_EVENT.new_update(args), observer);
499 }
500 Event::DeviceButton {
501 device: d_id,
502 button,
503 state,
504 } => {
505 let args = ButtonArgs::now(self.device_id(d_id), button, state);
506 self.notify_event(BUTTON_EVENT.new_update(args), observer);
507 }
508 Event::DeviceKey {
509 device: d_id,
510 key_code,
511 state,
512 } => {
513 let args = KeyArgs::now(self.device_id(d_id), key_code, state);
514 self.notify_event(KEY_EVENT.new_update(args), observer);
515 }
516
517 Event::LowMemory => {
518 LOW_MEMORY_EVENT.notify(LowMemoryArgs::now());
519 }
520
521 Event::RecoveredFromComponentPanic { component, recover, panic } => {
522 tracing::error!(
523 "view-process recovered from internal component panic\n component: {component}\n recover: {recover}\n```panic\n{panic}\n```"
524 );
525 }
526
527 Event::Inited(zng_view_api::Inited { .. }) | Event::Suspended | Event::Disconnected(_) | Event::FrameRendered(_) => {
529 unreachable!()
530 } }
532 }
533
534 fn on_view_rendered_event<O: AppEventObserver>(&mut self, ev: zng_view_api::window::EventFrameRendered, observer: &mut O) {
536 debug_assert!(ev.window != zng_view_api::window::WindowId::INVALID);
537 let window_id = WindowId::from_raw(ev.window.get());
538 let image = ev.frame_image.map(|img| VIEW_PROCESS.on_frame_image(img));
540 let args = crate::view_process::raw_events::RawFrameRenderedArgs::now(window_id, ev.frame, image);
541 self.notify_event(crate::view_process::raw_events::RAW_FRAME_RENDERED_EVENT.new_update(args), observer);
542 }
543
544 pub(crate) fn run_headed(mut self) {
545 let mut observer = ();
546 #[cfg(feature = "dyn_app_extension")]
547 let mut observer = observer.as_dyn();
548
549 self.apply_updates(&mut observer);
550 self.apply_update_events(&mut observer);
551 let mut wait = false;
552 loop {
553 wait = match self.poll_impl(wait, &mut observer) {
554 AppControlFlow::Poll => false,
555 AppControlFlow::Wait => true,
556 AppControlFlow::Exit => break,
557 };
558 }
559 }
560
561 fn push_coalesce<O: AppEventObserver>(&mut self, ev: AppEvent, observer: &mut O) {
562 match ev {
563 AppEvent::ViewEvent(ev) => match ev {
564 zng_view_api::Event::FrameRendered(ev) => {
565 if ev.window == zng_view_api::window::WindowId::INVALID {
566 tracing::error!("ignored rendered event for invalid window id, {ev:?}");
567 return;
568 }
569
570 let window = WindowId::from_raw(ev.window.get());
571
572 {
574 if VIEW_PROCESS.is_available() {
575 VIEW_PROCESS.on_frame_rendered(window);
576 }
577 }
578
579 #[cfg(debug_assertions)]
580 if self.pending_view_frame_events.iter().any(|e| e.window == ev.window) {
581 tracing::warn!("window `{window:?}` probably sent a frame request without awaiting renderer idle");
582 }
583
584 self.pending_view_frame_events.push(ev);
585 }
586 zng_view_api::Event::Inited(zng_view_api::Inited {
587 generation,
588 is_respawn,
589 available_monitors,
590 multi_click_config,
591 key_repeat_config,
592 touch_config,
593 font_aa,
594 animations_config,
595 locale_config,
596 colors_config,
597 chrome_config,
598 extensions,
599 }) => {
600 if is_respawn {
602 VIEW_PROCESS.on_respawned(generation);
603 APP_PROCESS_SV.read().is_suspended.set(false);
604 }
605
606 VIEW_PROCESS.handle_inited(generation, extensions.clone());
607
608 let monitors: Vec<_> = available_monitors
609 .into_iter()
610 .map(|(id, info)| (VIEW_PROCESS.monitor_id(id), info))
611 .collect();
612
613 VARS.animations_enabled().set(animations_config.enabled);
614
615 let args = crate::view_process::ViewProcessInitedArgs::now(
616 generation,
617 is_respawn,
618 monitors,
619 multi_click_config,
620 key_repeat_config,
621 touch_config,
622 font_aa,
623 animations_config,
624 locale_config,
625 colors_config,
626 chrome_config,
627 extensions,
628 );
629 self.notify_event(VIEW_PROCESS_INITED_EVENT.new_update(args), observer);
630 }
631 zng_view_api::Event::Suspended => {
632 VIEW_PROCESS.handle_suspended();
633 let args = crate::view_process::ViewProcessSuspendedArgs::now();
634 self.notify_event(VIEW_PROCESS_SUSPENDED_EVENT.new_update(args), observer);
635 APP_PROCESS_SV.read().is_suspended.set(true);
636 }
637 zng_view_api::Event::Disconnected(vp_gen) => {
638 VIEW_PROCESS.handle_disconnect(vp_gen);
640 }
641 ev => {
642 if let Some(last) = self.pending_view_events.last_mut() {
643 match last.coalesce(ev) {
644 Ok(()) => {}
645 Err(ev) => self.pending_view_events.push(ev),
646 }
647 } else {
648 self.pending_view_events.push(ev);
649 }
650 }
651 },
652 AppEvent::Event(ev) => EVENTS.notify(ev.get()),
653 AppEvent::Update(op, target) => {
654 UPDATES.update_op(op, target);
655 }
656 AppEvent::CheckUpdate => {}
657 AppEvent::ResumeUnwind(p) => std::panic::resume_unwind(p),
658 }
659 }
660
661 fn has_pending_updates(&mut self) -> bool {
662 !self.pending_view_events.is_empty() || self.pending.has_updates() || UPDATES.has_pending_updates() || !self.receiver.is_empty()
663 }
664
665 pub(crate) fn poll<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
666 #[cfg(feature = "dyn_app_extension")]
667 let mut observer = observer.as_dyn();
668 #[cfg(feature = "dyn_app_extension")]
669 let observer = &mut observer;
670 self.poll_impl(wait_app_event, observer)
671 }
672 fn poll_impl<O: AppEventObserver>(&mut self, wait_app_event: bool, observer: &mut O) -> AppControlFlow {
673 let mut disconnected = false;
674
675 if self.exited {
676 return AppControlFlow::Exit;
677 }
678
679 if wait_app_event {
680 let idle = tracing::debug_span!("<idle>", ended_by = tracing::field::Empty).entered();
681
682 let timer = if self.view_is_busy() { None } else { self.loop_timer.poll() };
683 if let Some(time) = timer {
684 match self.receiver.recv_deadline_sp(time) {
685 Ok(ev) => {
686 idle.record("ended_by", "event");
687 drop(idle);
688 self.push_coalesce(ev, observer)
689 }
690 Err(e) => match e {
691 flume::RecvTimeoutError::Timeout => {
692 idle.record("ended_by", "timeout");
693 }
694 flume::RecvTimeoutError::Disconnected => {
695 idle.record("ended_by", "disconnected");
696 disconnected = true
697 }
698 },
699 }
700 } else {
701 match self.receiver.recv() {
702 Ok(ev) => {
703 idle.record("ended_by", "event");
704 drop(idle);
705 self.push_coalesce(ev, observer)
706 }
707 Err(e) => match e {
708 flume::RecvError::Disconnected => {
709 idle.record("ended_by", "disconnected");
710 disconnected = true
711 }
712 },
713 }
714 }
715 }
716 loop {
717 match self.receiver.try_recv() {
718 Ok(ev) => self.push_coalesce(ev, observer),
719 Err(e) => match e {
720 flume::TryRecvError::Empty => break,
721 flume::TryRecvError::Disconnected => {
722 disconnected = true;
723 break;
724 }
725 },
726 }
727 }
728 if disconnected {
729 panic!("app events channel disconnected");
730 }
731
732 if self.view_is_busy() {
733 return AppControlFlow::Wait;
734 }
735
736 UPDATES.on_app_awake();
737
738 let updated_timers = self.loop_timer.awake();
740 if updated_timers {
741 UPDATES.update_timers(&mut self.loop_timer);
743 self.apply_updates(observer);
744 }
745
746 let mut events = mem::take(&mut self.pending_view_events);
747 for ev in events.drain(..) {
748 self.on_view_event(ev, observer);
749 self.apply_updates(observer);
750 }
751 debug_assert!(self.pending_view_events.is_empty());
752 self.pending_view_events = events; let mut events = mem::take(&mut self.pending_view_frame_events);
755 for ev in events.drain(..) {
756 self.on_view_rendered_event(ev, observer);
757 }
758 self.pending_view_frame_events = events;
759
760 if self.has_pending_updates() {
761 self.apply_updates(observer);
762 self.apply_update_events(observer);
763 }
764
765 if self.view_is_busy() {
766 return AppControlFlow::Wait;
767 }
768
769 self.finish_frame(observer);
770
771 UPDATES.next_deadline(&mut self.loop_timer);
772
773 if self.extensions.0.exit() {
774 UPDATES.on_app_sleep();
775 self.exited = true;
776 AppControlFlow::Exit
777 } else if self.has_pending_updates() || UPDATES.has_pending_layout_or_render() {
778 AppControlFlow::Poll
779 } else {
780 UPDATES.on_app_sleep();
781 AppControlFlow::Wait
782 }
783 }
784
785 fn apply_updates<O: AppEventObserver>(&mut self, observer: &mut O) {
787 let _s = tracing::debug_span!("apply_updates").entered();
788
789 let mut run = true;
790 while run {
791 run = self.loop_monitor.update(|| {
792 let mut any = false;
793
794 self.pending |= UPDATES.apply_info();
795 if mem::take(&mut self.pending.info) {
796 any = true;
797 let _s = tracing::debug_span!("info").entered();
798
799 let mut info_widgets = mem::take(&mut self.pending.info_widgets);
800
801 let _t = INSTANT_APP.pause_for_update();
802
803 {
804 let _s = tracing::debug_span!("ext.info").entered();
805 self.extensions.info(&mut info_widgets);
806 }
807 {
808 let _s = tracing::debug_span!("obs.info").entered();
809 observer.info(&mut info_widgets);
810 }
811 }
812
813 self.pending |= UPDATES.apply_updates();
814 TimersService::notify();
815 if mem::take(&mut self.pending.update) {
816 any = true;
817 let _s = tracing::debug_span!("update").entered();
818
819 let mut update_widgets = mem::take(&mut self.pending.update_widgets);
820
821 let _t = INSTANT_APP.pause_for_update();
822
823 {
824 let _s = tracing::debug_span!("ext.update_preview").entered();
825 self.extensions.update_preview();
826 }
827 {
828 let _s = tracing::debug_span!("obs.update_preview").entered();
829 observer.update_preview();
830 }
831 UPDATES.on_pre_updates();
832
833 {
834 let _s = tracing::debug_span!("ext.update_ui").entered();
835 self.extensions.update_ui(&mut update_widgets);
836 }
837 {
838 let _s = tracing::debug_span!("obs.update_ui").entered();
839 observer.update_ui(&mut update_widgets);
840 }
841
842 {
843 let _s = tracing::debug_span!("ext.update").entered();
844 self.extensions.update();
845 }
846 {
847 let _s = tracing::debug_span!("obs.update").entered();
848 observer.update();
849 }
850 UPDATES.on_updates();
851 }
852
853 any
854 });
855 }
856 }
857
858 fn apply_update_events<O: AppEventObserver>(&mut self, observer: &mut O) {
860 let _s = tracing::debug_span!("apply_update_events").entered();
861
862 loop {
863 let events: Vec<_> = self.pending.events.drain(..).collect();
864 if events.is_empty() {
865 break;
866 }
867 for mut update in events {
868 let _s = tracing::debug_span!("update_event", ?update).entered();
869
870 self.loop_monitor.maybe_trace(|| {
871 let _t = INSTANT_APP.pause_for_update();
872
873 {
874 let _s = tracing::debug_span!("ext.event_preview").entered();
875 self.extensions.event_preview(&mut update);
876 }
877 {
878 let _s = tracing::debug_span!("obs.event_preview").entered();
879 observer.event_preview(&mut update);
880 }
881 update.call_pre_actions();
882
883 {
884 let _s = tracing::debug_span!("ext.event_ui").entered();
885 self.extensions.event_ui(&mut update);
886 }
887 {
888 let _s = tracing::debug_span!("obs.event_ui").entered();
889 observer.event_ui(&mut update);
890 }
891 {
892 let _s = tracing::debug_span!("ext.event").entered();
893 self.extensions.event(&mut update);
894 }
895 {
896 let _s = tracing::debug_span!("obs.event").entered();
897 observer.event(&mut update);
898 }
899 update.call_pos_actions();
900 });
901
902 self.apply_updates(observer);
903 }
904 }
905 }
906
907 fn view_is_busy(&mut self) -> bool {
908 VIEW_PROCESS.is_available() && VIEW_PROCESS.pending_frames() > 0
909 }
910
911 fn finish_frame<O: AppEventObserver>(&mut self, observer: &mut O) {
913 debug_assert!(!self.view_is_busy());
914
915 self.pending |= UPDATES.apply_layout_render();
916
917 while mem::take(&mut self.pending.layout) {
918 let _s = tracing::debug_span!("apply_layout").entered();
919
920 let mut layout_widgets = mem::take(&mut self.pending.layout_widgets);
921
922 self.loop_monitor.maybe_trace(|| {
923 let _t = INSTANT_APP.pause_for_update();
924
925 {
926 let _s = tracing::debug_span!("ext.layout").entered();
927 self.extensions.layout(&mut layout_widgets);
928 }
929 {
930 let _s = tracing::debug_span!("obs.layout").entered();
931 observer.layout(&mut layout_widgets);
932 }
933 });
934
935 self.apply_updates(observer);
936 self.pending |= UPDATES.apply_layout_render();
937 }
938
939 if mem::take(&mut self.pending.render) {
940 let _s = tracing::debug_span!("apply_render").entered();
941
942 let mut render_widgets = mem::take(&mut self.pending.render_widgets);
943 let mut render_update_widgets = mem::take(&mut self.pending.render_update_widgets);
944
945 let _t = INSTANT_APP.pause_for_update();
946
947 {
948 let _s = tracing::debug_span!("ext.render").entered();
949 self.extensions.render(&mut render_widgets, &mut render_update_widgets);
950 }
951 {
952 let _s = tracing::debug_span!("obs.render").entered();
953 observer.render(&mut render_widgets, &mut render_update_widgets);
954 }
955 }
956
957 self.loop_monitor.finish_frame();
958 }
959}
960impl<E: AppExtension> Drop for RunningApp<E> {
961 fn drop(&mut self) {
962 let _s = tracing::debug_span!("ext.deinit").entered();
963 self.extensions.deinit();
964 VIEW_PROCESS.exit();
965 }
966}
967
968pub struct AppStartArgs {
973 _private: (),
974}
975
976pub fn on_app_start(handler: impl FnMut(&AppStartArgs) + Send + 'static) {
982 zng_unique_id::hot_static_ref!(ON_APP_START).lock().push(Box::new(handler))
983}
984zng_unique_id::hot_static! {
985 static ON_APP_START: Mutex<Vec<AppStartHandler>> = Mutex::new(vec![]);
986}
987type AppStartHandler = Box<dyn FnMut(&AppStartArgs) + Send + 'static>;
988
989#[derive(Debug)]
991pub(crate) struct LoopTimer {
992 now: DInstant,
993 deadline: Option<Deadline>,
994}
995impl Default for LoopTimer {
996 fn default() -> Self {
997 Self {
998 now: INSTANT.now(),
999 deadline: None,
1000 }
1001 }
1002}
1003impl LoopTimer {
1004 pub fn elapsed(&mut self, deadline: Deadline) -> bool {
1007 if deadline.0 <= self.now {
1008 true
1009 } else {
1010 self.register(deadline);
1011 false
1012 }
1013 }
1014
1015 pub fn register(&mut self, deadline: Deadline) {
1017 if let Some(d) = &mut self.deadline {
1018 if deadline < *d {
1019 *d = deadline;
1020 }
1021 } else {
1022 self.deadline = Some(deadline)
1023 }
1024 }
1025
1026 pub(crate) fn poll(&mut self) -> Option<Deadline> {
1028 self.deadline
1029 }
1030
1031 pub(crate) fn awake(&mut self) -> bool {
1033 self.now = INSTANT.now();
1034 if let Some(d) = self.deadline {
1035 if d.0 <= self.now {
1036 self.deadline = None;
1037 return true;
1038 }
1039 }
1040 false
1041 }
1042
1043 pub fn now(&self) -> DInstant {
1045 self.now
1046 }
1047}
1048impl zng_var::animation::AnimationTimer for LoopTimer {
1049 fn elapsed(&mut self, deadline: Deadline) -> bool {
1050 self.elapsed(deadline)
1051 }
1052
1053 fn register(&mut self, deadline: Deadline) {
1054 self.register(deadline)
1055 }
1056
1057 fn now(&self) -> DInstant {
1058 self.now()
1059 }
1060}
1061
1062#[derive(Default)]
1063struct LoopMonitor {
1064 update_count: u16,
1065 skipped: bool,
1066 trace: Vec<UpdateTrace>,
1067}
1068impl LoopMonitor {
1069 pub fn update(&mut self, update_once: impl FnOnce() -> bool) -> bool {
1071 self.update_count += 1;
1072
1073 if self.update_count < 500 {
1074 update_once()
1075 } else if self.update_count < 1000 {
1076 UpdatesTrace::collect_trace(&mut self.trace, update_once)
1077 } else if self.update_count == 1000 {
1078 self.skipped = true;
1079 let trace = UpdatesTrace::format_trace(mem::take(&mut self.trace));
1080 tracing::error!(
1081 "updated 1000 times without rendering, probably stuck in an infinite loop\n\
1082 will start skipping updates to render and poll system events\n\
1083 top 20 most frequent update requests (in 500 cycles):\n\
1084 {trace}\n\
1085 you can use `UpdatesTraceUiNodeExt` and `updates_trace_event` to refine the trace"
1086 );
1087 false
1088 } else if self.update_count == 1500 {
1089 self.update_count = 1001;
1090 false
1091 } else {
1092 update_once()
1093 }
1094 }
1095
1096 pub fn maybe_trace(&mut self, notify_once: impl FnOnce()) {
1097 if (500..1000).contains(&self.update_count) {
1098 UpdatesTrace::collect_trace(&mut self.trace, notify_once);
1099 } else {
1100 notify_once();
1101 }
1102 }
1103
1104 pub fn finish_frame(&mut self) {
1105 if !self.skipped {
1106 self.skipped = false;
1107 self.update_count = 0;
1108 self.trace = vec![];
1109 }
1110 }
1111}
1112
1113impl APP {
1114 pub fn exit(&self) -> ResponseVar<ExitCancelled> {
1123 APP_PROCESS_SV.write().exit()
1124 }
1125
1126 pub fn is_suspended(&self) -> ReadOnlyArcVar<bool> {
1134 APP_PROCESS_SV.read().is_suspended.read_only()
1135 }
1136}
1137
1138impl APP {
1142 pub fn pause_time_for_update(&self) -> ArcVar<bool> {
1148 APP_PROCESS_SV.read().pause_time_for_updates.clone()
1149 }
1150
1151 pub fn start_manual_time(&self) {
1159 INSTANT_APP.set_mode(InstantMode::Manual);
1160 INSTANT_APP.set_now(INSTANT.now());
1161 UPDATES.update(None);
1162 }
1163
1164 pub fn advance_manual_time(&self, advance: Duration) {
1175 INSTANT_APP.advance_now(advance);
1176 UPDATES.update(None);
1177 }
1178
1179 pub fn set_manual_time(&self, now: DInstant) {
1188 INSTANT_APP.set_now(now);
1189 UPDATES.update(None);
1190 }
1191
1192 pub fn end_manual_time(&self) {
1194 INSTANT_APP.set_mode(match APP.pause_time_for_update().get() {
1195 true => InstantMode::UpdatePaused,
1196 false => InstantMode::Now,
1197 });
1198 UPDATES.update(None);
1199 }
1200}
1201
1202command! {
1203 pub static EXIT_CMD = {
1207 l10n!: true,
1208 name: "Exit",
1209 info: "Close all windows and exit",
1210 shortcut: shortcut!(Exit),
1211 };
1212}
1213
1214#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1218pub struct ExitCancelled;
1219impl fmt::Display for ExitCancelled {
1220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1221 write!(f, "exit request cancelled")
1222 }
1223}
1224
1225struct AppIntrinsic {
1226 exit_handle: CommandHandle,
1227 pending_exit: Option<PendingExit>,
1228}
1229struct PendingExit {
1230 handle: EventPropagationHandle,
1231 response: ResponderVar<ExitCancelled>,
1232}
1233impl AppIntrinsic {
1234 pub(super) fn pre_init(
1236 is_headed: bool,
1237 with_renderer: bool,
1238 view_process_exe: PathBuf,
1239 view_process_env: HashMap<Txt, Txt>,
1240 device_events: bool,
1241 ) -> Self {
1242 APP_PROCESS_SV
1243 .read()
1244 .pause_time_for_updates
1245 .hook(|a| {
1246 if !matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1247 if *a.value() {
1248 INSTANT_APP.set_mode(InstantMode::UpdatePaused);
1249 } else {
1250 INSTANT_APP.set_mode(InstantMode::Now);
1251 }
1252 }
1253 true
1254 })
1255 .perm();
1256
1257 if is_headed {
1258 debug_assert!(with_renderer);
1259
1260 let view_evs_sender = UPDATES.sender();
1261 VIEW_PROCESS.start(view_process_exe, view_process_env, device_events, false, move |ev| {
1262 let _ = view_evs_sender.send_view_event(ev);
1263 });
1264 } else if with_renderer {
1265 let view_evs_sender = UPDATES.sender();
1266 VIEW_PROCESS.start(view_process_exe, view_process_env, false, true, move |ev| {
1267 let _ = view_evs_sender.send_view_event(ev);
1268 });
1269 }
1270
1271 AppIntrinsic {
1272 exit_handle: EXIT_CMD.subscribe(true),
1273 pending_exit: None,
1274 }
1275 }
1276
1277 pub(super) fn exit(&mut self) -> bool {
1279 if let Some(pending) = self.pending_exit.take() {
1280 if pending.handle.is_stopped() {
1281 pending.response.respond(ExitCancelled);
1282 false
1283 } else {
1284 true
1285 }
1286 } else {
1287 false
1288 }
1289 }
1290}
1291impl AppExtension for AppIntrinsic {
1292 fn event_preview(&mut self, update: &mut EventUpdate) {
1293 if let Some(args) = EXIT_CMD.on(update) {
1294 args.handle_enabled(&self.exit_handle, |_| {
1295 APP.exit();
1296 });
1297 }
1298 }
1299
1300 fn update(&mut self) {
1301 if let Some(response) = APP_PROCESS_SV.write().take_requests() {
1302 let args = ExitRequestedArgs::now();
1303 self.pending_exit = Some(PendingExit {
1304 handle: args.propagation().clone(),
1305 response,
1306 });
1307 EXIT_REQUESTED_EVENT.notify(args);
1308 }
1309 }
1310}
1311
1312pub(crate) fn assert_not_view_process() {
1313 if zng_view_api::ViewConfig::from_env().is_some() {
1314 panic!("cannot start App in view-process");
1315 }
1316}
1317
1318#[cfg(feature = "deadlock_detection")]
1319pub(crate) fn check_deadlock() {
1320 use parking_lot::deadlock;
1321 use std::{
1322 sync::atomic::{self, AtomicBool},
1323 thread,
1324 time::*,
1325 };
1326
1327 static CHECK_RUNNING: AtomicBool = AtomicBool::new(false);
1328
1329 if CHECK_RUNNING.swap(true, atomic::Ordering::SeqCst) {
1330 return;
1331 }
1332
1333 thread::spawn(|| {
1334 loop {
1335 thread::sleep(Duration::from_secs(10));
1336
1337 let deadlocks = deadlock::check_deadlock();
1338 if deadlocks.is_empty() {
1339 continue;
1340 }
1341
1342 use std::fmt::Write;
1343 let mut msg = String::new();
1344
1345 let _ = writeln!(&mut msg, "{} deadlocks detected", deadlocks.len());
1346 for (i, threads) in deadlocks.iter().enumerate() {
1347 let _ = writeln!(&mut msg, "Deadlock #{}, {} threads", i, threads.len());
1348 for t in threads {
1349 let _ = writeln!(&mut msg, "Thread Id {:#?}", t.thread_id());
1350 let _ = writeln!(&mut msg, "{:#?}", t.backtrace());
1351 }
1352 }
1353
1354 #[cfg(not(feature = "test_util"))]
1355 eprint!("{msg}");
1356
1357 #[cfg(feature = "test_util")]
1358 {
1359 use std::io::Write;
1362 let _ = write!(&mut std::io::stderr(), "{msg}");
1363 zng_env::exit(-1);
1364 }
1365 }
1366 });
1367}
1368#[cfg(not(feature = "deadlock_detection"))]
1369pub(crate) fn check_deadlock() {}
1370
1371app_local! {
1372 pub(super) static APP_PROCESS_SV: AppProcessService = AppProcessService {
1373 exit_requests: None,
1374 extensions: None,
1375 device_events: false,
1376 pause_time_for_updates: zng_var::var(true),
1377 is_suspended: zng_var::var(false),
1378 };
1379}
1380
1381pub(super) struct AppProcessService {
1382 exit_requests: Option<ResponderVar<ExitCancelled>>,
1383 extensions: Option<Arc<AppExtensionsInfo>>,
1384 pub(super) device_events: bool,
1385 pause_time_for_updates: ArcVar<bool>,
1386 is_suspended: ArcVar<bool>,
1387}
1388impl AppProcessService {
1389 pub(super) fn take_requests(&mut self) -> Option<ResponderVar<ExitCancelled>> {
1390 self.exit_requests.take()
1391 }
1392
1393 fn exit(&mut self) -> ResponseVar<ExitCancelled> {
1394 if let Some(r) = &self.exit_requests {
1395 r.response_var()
1396 } else {
1397 let (responder, response) = response_var();
1398 self.exit_requests = Some(responder);
1399 UPDATES.update(None);
1400 response
1401 }
1402 }
1403
1404 pub(super) fn extensions(&self) -> Arc<AppExtensionsInfo> {
1405 self.extensions
1406 .clone()
1407 .unwrap_or_else(|| Arc::new(AppExtensionsInfo { infos: vec![] }))
1408 }
1409
1410 pub(super) fn set_extensions(&mut self, info: AppExtensionsInfo, device_events: bool) {
1411 self.extensions = Some(Arc::new(info));
1412 self.device_events = device_events;
1413 }
1414}
1415
1416#[derive(Debug)]
1418pub(crate) enum AppEvent {
1419 ViewEvent(zng_view_api::Event),
1421 Event(crate::event::EventUpdateMsg),
1423 Update(UpdateOp, Option<WidgetId>),
1425 ResumeUnwind(PanicPayload),
1427 CheckUpdate,
1429}
1430
1431#[derive(Clone)]
1437pub struct AppEventSender(flume::Sender<AppEvent>);
1438impl AppEventSender {
1439 pub(crate) fn new() -> (Self, flume::Receiver<AppEvent>) {
1440 let (sender, receiver) = flume::unbounded();
1441 (Self(sender), receiver)
1442 }
1443
1444 #[allow(clippy::result_large_err)] fn send_app_event(&self, event: AppEvent) -> Result<(), AppDisconnected<AppEvent>> {
1446 self.0.send(event)?;
1447 Ok(())
1448 }
1449
1450 #[allow(clippy::result_large_err)]
1451 fn send_view_event(&self, event: zng_view_api::Event) -> Result<(), AppDisconnected<AppEvent>> {
1452 self.0.send(AppEvent::ViewEvent(event))?;
1453 Ok(())
1454 }
1455
1456 pub fn send_update(&self, op: UpdateOp, target: impl Into<Option<WidgetId>>) -> Result<(), AppDisconnected<()>> {
1458 UpdatesTrace::log_update();
1459 self.send_app_event(AppEvent::Update(op, target.into()))
1460 .map_err(|_| AppDisconnected(()))
1461 }
1462
1463 pub(crate) fn send_event(&self, event: crate::event::EventUpdateMsg) -> Result<(), AppDisconnected<crate::event::EventUpdateMsg>> {
1465 self.send_app_event(AppEvent::Event(event)).map_err(|e| match e.0 {
1466 AppEvent::Event(ev) => AppDisconnected(ev),
1467 _ => unreachable!(),
1468 })
1469 }
1470
1471 pub fn send_resume_unwind(&self, payload: PanicPayload) -> Result<(), AppDisconnected<PanicPayload>> {
1473 self.send_app_event(AppEvent::ResumeUnwind(payload)).map_err(|e| match e.0 {
1474 AppEvent::ResumeUnwind(p) => AppDisconnected(p),
1475 _ => unreachable!(),
1476 })
1477 }
1478
1479 pub(crate) fn send_check_update(&self) -> Result<(), AppDisconnected<()>> {
1481 self.send_app_event(AppEvent::CheckUpdate).map_err(|_| AppDisconnected(()))
1482 }
1483
1484 pub fn waker(&self, target: impl Into<Option<WidgetId>>) -> Waker {
1486 Arc::new(AppWaker(self.0.clone(), target.into())).into()
1487 }
1488
1489 pub fn ext_channel<T>(&self) -> (AppExtSender<T>, AppExtReceiver<T>) {
1491 let (sender, receiver) = flume::unbounded();
1492
1493 (
1494 AppExtSender {
1495 update: self.clone(),
1496 sender,
1497 },
1498 AppExtReceiver { receiver },
1499 )
1500 }
1501
1502 pub fn ext_channel_bounded<T>(&self, cap: usize) -> (AppExtSender<T>, AppExtReceiver<T>) {
1504 let (sender, receiver) = flume::bounded(cap);
1505
1506 (
1507 AppExtSender {
1508 update: self.clone(),
1509 sender,
1510 },
1511 AppExtReceiver { receiver },
1512 )
1513 }
1514}
1515
1516struct AppWaker(flume::Sender<AppEvent>, Option<WidgetId>);
1517impl std::task::Wake for AppWaker {
1518 fn wake(self: std::sync::Arc<Self>) {
1519 self.wake_by_ref()
1520 }
1521 fn wake_by_ref(self: &Arc<Self>) {
1522 let _ = self.0.send(AppEvent::Update(UpdateOp::Update, self.1));
1523 }
1524}
1525
1526type PanicPayload = Box<dyn std::any::Any + Send + 'static>;
1527
1528pub struct AppExtSender<T> {
1532 update: AppEventSender,
1533 sender: flume::Sender<T>,
1534}
1535impl<T> Clone for AppExtSender<T> {
1536 fn clone(&self) -> Self {
1537 Self {
1538 update: self.update.clone(),
1539 sender: self.sender.clone(),
1540 }
1541 }
1542}
1543impl<T: Send> AppExtSender<T> {
1544 pub fn send(&self, msg: T) -> Result<(), AppDisconnected<T>> {
1546 match self.update.send_update(UpdateOp::Update, None) {
1547 Ok(()) => self.sender.send(msg).map_err(|e| AppDisconnected(e.0)),
1548 Err(_) => Err(AppDisconnected(msg)),
1549 }
1550 }
1551
1552 pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), TimeoutOrAppDisconnected> {
1554 match self.update.send_update(UpdateOp::Update, None) {
1555 Ok(()) => self.sender.send_timeout(msg, dur).map_err(|e| match e {
1556 flume::SendTimeoutError::Timeout(_) => TimeoutOrAppDisconnected::Timeout,
1557 flume::SendTimeoutError::Disconnected(_) => TimeoutOrAppDisconnected::AppDisconnected,
1558 }),
1559 Err(_) => Err(TimeoutOrAppDisconnected::AppDisconnected),
1560 }
1561 }
1562
1563 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), TimeoutOrAppDisconnected> {
1565 match self.update.send_update(UpdateOp::Update, None) {
1566 Ok(()) => self.sender.send_deadline(msg, deadline).map_err(|e| match e {
1567 flume::SendTimeoutError::Timeout(_) => TimeoutOrAppDisconnected::Timeout,
1568 flume::SendTimeoutError::Disconnected(_) => TimeoutOrAppDisconnected::AppDisconnected,
1569 }),
1570 Err(_) => Err(TimeoutOrAppDisconnected::AppDisconnected),
1571 }
1572 }
1573}
1574
1575pub struct AppExtReceiver<T> {
1579 receiver: flume::Receiver<T>,
1580}
1581impl<T> Clone for AppExtReceiver<T> {
1582 fn clone(&self) -> Self {
1583 Self {
1584 receiver: self.receiver.clone(),
1585 }
1586 }
1587}
1588impl<T> AppExtReceiver<T> {
1589 pub fn try_recv(&self) -> Result<T, Option<AppExtSenderDisconnected>> {
1594 self.receiver.try_recv().map_err(|e| match e {
1595 flume::TryRecvError::Empty => None,
1596 flume::TryRecvError::Disconnected => Some(AppExtSenderDisconnected),
1597 })
1598 }
1599}
1600
1601#[derive(Debug)]
1605pub struct AppExtSenderDisconnected;
1606impl fmt::Display for AppExtSenderDisconnected {
1607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608 write!(f, "cannot receive because the sender disconnected")
1609 }
1610}
1611impl std::error::Error for AppExtSenderDisconnected {}
1612
1613event_args! {
1614 pub struct ExitRequestedArgs {
1618
1619 ..
1620
1621 fn delivery_list(&self, list: &mut UpdateDeliveryList) {
1623 list.search_all()
1624 }
1625 }
1626}
1627
1628event! {
1629 pub static EXIT_REQUESTED_EVENT: ExitRequestedArgs;
1637}
1638
1639trait ReceiverExt<T> {
1641 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError>;
1643}
1644
1645const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
1646const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
1647
1648impl<T> ReceiverExt<T> for flume::Receiver<T> {
1649 fn recv_deadline_sp(&self, deadline: Deadline) -> Result<T, flume::RecvTimeoutError> {
1650 loop {
1651 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
1652 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
1653 match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1656 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1658 }
1659 } else if d > WORST_SLEEP_ERR {
1660 #[cfg(not(target_arch = "wasm32"))]
1662 match self.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
1663 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1665 }
1666
1667 #[cfg(target_arch = "wasm32")] match self.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
1669 Err(flume::RecvTimeoutError::Timeout) => continue, interrupt => return interrupt,
1671 }
1672 } else if d > WORST_SPIN_ERR {
1673 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
1674
1675 while !spin_deadline.has_elapsed() {
1677 match self.try_recv() {
1678 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
1679 Err(flume::TryRecvError::Disconnected) => return Err(flume::RecvTimeoutError::Disconnected),
1680 Ok(msg) => return Ok(msg),
1681 }
1682 }
1683 continue; } else {
1685 while !deadline.has_elapsed() {
1687 std::thread::yield_now();
1688 }
1689 return Err(flume::RecvTimeoutError::Timeout);
1690 }
1691 } else {
1692 return Err(flume::RecvTimeoutError::Timeout);
1693 }
1694 }
1695 }
1696}