zng_view_api/
app_process.rs

1use std::{
2    collections::HashMap,
3    panic,
4    path::{Path, PathBuf},
5    sync::Arc,
6    thread::{self, JoinHandle},
7    time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_task::channel::ChannelError;
14use zng_txt::Txt;
15
16use crate::{
17    AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
18    ipc::{self, EventReceiver},
19};
20
21/// The listener returns the closure on join for reuse in respawn.
22type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
23
24pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
25pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
26pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
27
28#[derive(Clone, Copy)]
29enum ViewState {
30    NotRunning,
31    RunningAndConnected,
32    Suspended,
33}
34
35/// View Process controller, used in the App Process.
36///
37/// # Exit
38///
39/// The View Process is [killed] when the controller is dropped, if the app is running in same process mode
40/// then the current process [exits] with code 0 on drop.
41///
42/// In multi-process mode the View Process is also killed to respawn if it does not send any event after 30 seconds,
43/// the app must call [`Controller::ping`] periodically to generate the [`Event::Pong`] to detect availability.
44///
45/// [killed]: std::process::Child::kill
46/// [exits]: std::process::exit
47#[cfg_attr(not(ipc), allow(unused))]
48pub struct Controller {
49    process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
50    view_state: ViewState,
51    generation: ViewProcessGen,
52    is_respawn: bool,
53    view_process_exe: PathBuf,
54    view_process_env: HashMap<Txt, Txt>,
55    request_sender: ipc::RequestSender,
56    response_receiver: ipc::ResponseReceiver,
57    event_listener: Option<EventListenerJoin>,
58    headless: bool,
59    same_process: bool,
60    last_respawn: Option<Instant>,
61    fast_respawn_count: u8,
62}
63#[cfg(test)]
64fn _assert_sync(x: Controller) -> impl Send + Sync {
65    x
66}
67impl Controller {
68    /// Start with a custom view process.
69    ///
70    /// The `view_process_exe` must be an executable that starts a view server.
71    /// Note that the [`VERSION`] of this crate must match in both executables.
72    ///
73    /// The `view_process_env` can be set to any env var needed to start the view-process. Note that if `view_process_exe`
74    /// is the current executable this most likely need set `zng_env::PROCESS_MAIN`.
75    ///
76    /// The `on_event` closure is called in another thread every time the app receives an event.
77    ///
78    /// # Tests
79    ///
80    /// The [`current_exe`] cannot be used in tests, you should set an external view-process executable. Unfortunately there
81    /// is no way to check if `start` was called in a test so we cannot provide an error message for this.
82    /// If the test is hanging in debug builds or has a timeout error in release builds this is probably the reason.
83    ///
84    /// # Connect Timeout
85    ///
86    /// If the view process takes longer than 10 seconds to connect it is considered failed and a respawn will be attempted.
87    /// This timeout is very reasonable in most cases, specially since users definitely need some visual feedback sooner, but
88    /// some test runner machines can be very slow. You can can set the `"ZNG_VIEW_TIMEOUT"` variable to a custom timeout in
89    /// seconds. The minimum value is 5 seconds. This timeout value is also used to define a *not responding* respawn.
90    ///
91    /// [`current_exe`]: std::env::current_exe
92    /// [`VERSION`]: crate::VERSION
93    pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
94    where
95        F: FnMut(Event) + Send + 'static,
96    {
97        Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
98    }
99    fn start_impl(
100        view_process_exe: PathBuf,
101        view_process_env: HashMap<Txt, Txt>,
102        headless: bool,
103        on_event: Box<dyn FnMut(Event) + Send>,
104    ) -> Self {
105        if ViewConfig::from_env().is_some() {
106            panic!("cannot start Controller in process configured to be view-process");
107        }
108
109        let (process, request_sender, response_receiver, event_receiver) =
110            Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
111        let same_process = process.is_none();
112        let process = Arc::new(Mutex::new(process.map(|p| (p, false))));
113        let ev = if same_process {
114            Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
115        } else {
116            Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
117        };
118
119        let mut c = Controller {
120            same_process,
121            view_state: ViewState::NotRunning,
122            process,
123            view_process_exe,
124            view_process_env,
125            request_sender,
126            response_receiver,
127            event_listener: Some(ev),
128            headless,
129            generation: ViewProcessGen::INVALID,
130            is_respawn: false,
131            last_respawn: None,
132            fast_respawn_count: 0,
133        };
134
135        if let Err(ChannelError::Disconnected { .. }) = c.try_init() {
136            panic!("respawn on init");
137        }
138
139        c
140    }
141    fn spawn_same_process_listener(
142        mut on_event: Box<dyn FnMut(Event) + Send>,
143        mut event_receiver: EventReceiver,
144        generation: ViewProcessGen,
145    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
146        thread::Builder::new()
147            .name("same_process_listener".into())
148            .spawn(move || {
149                while let Ok(ev) = event_receiver.recv() {
150                    on_event(ev);
151                }
152                on_event(Event::Disconnected(generation));
153
154                // return to reuse in respawn.
155                on_event
156            })
157            .expect("failed to spawn thread")
158    }
159    fn spawn_other_process_listener(
160        mut on_event: Box<dyn FnMut(Event) + Send>,
161        mut event_receiver: EventReceiver,
162        process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
163        generation: ViewProcessGen,
164    ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
165        // spawns a thread that receives view-process events and monitors for process responsiveness
166        // - ipc-channel sometimes does not signal disconnect when the view-process dies, this monitors the process state every second.
167        // - app-process pings every 2s of inactivity, this kills the view-process it it does not respond for more them ZNG_VIEW_TIMEOUT.
168        thread::Builder::new()
169            .name("other_process_listener".into())
170            .spawn(move || {
171                const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
172                let timeout = view_timeout();
173                let mut check_count = 0u64;
174                loop {
175                    match event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
176                        Ok(ev) => {
177                            check_count = 0;
178                            on_event(ev)
179                        }
180                        Err(ChannelError::Timeout) => {
181                            if let Some(p) = &mut *process.lock() {
182                                match p.0.try_wait() {
183                                    Ok(c) => {
184                                        if c.is_some() {
185                                            // view-process died
186                                            break;
187                                        } else {
188                                            check_count += 1;
189                                            if check_count == timeout {
190                                                tracing::error!("view-process not responding for {timeout}s, will respawn");
191                                                let _ = p.0.kill();
192                                                p.1 = true;
193                                                break;
194                                            }
195                                        }
196                                    }
197                                    Err(e) => {
198                                        if e.kind() != std::io::ErrorKind::Interrupted {
199                                            tracing::error!("view-process try_wait error after inactivity, {e}");
200                                            break;
201                                        }
202                                    }
203                                }
204                            } else {
205                                // respawning already
206                                break;
207                            }
208                        }
209                        Err(_) => break,
210                    }
211                }
212                on_event(Event::Disconnected(generation));
213
214                // return to reuse in respawn.
215                on_event
216            })
217            .expect("failed to spawn thread")
218    }
219
220    fn try_init(&mut self) -> VpResult<()> {
221        self.init(self.generation, self.is_respawn, self.headless)?;
222        Ok(())
223    }
224
225    /// View-process is running, connected and ready to respond.
226    pub fn is_connected(&self) -> bool {
227        matches!(self.view_state, ViewState::RunningAndConnected)
228    }
229
230    /// View-process generation.
231    pub fn generation(&self) -> ViewProcessGen {
232        self.generation
233    }
234
235    /// If is running in headless mode.
236    pub fn headless(&self) -> bool {
237        self.headless
238    }
239
240    /// If is running both view and app in the same process.
241    pub fn same_process(&self) -> bool {
242        self.same_process
243    }
244
245    fn try_talk(&mut self, req: Request) -> Result<Response, ChannelError> {
246        self.request_sender.send(req)?;
247        self.response_receiver.recv()
248    }
249    pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
250        debug_assert!(req.expect_response());
251
252        tracing::trace!("talk {req:?}");
253
254        if req.must_be_connected() && !self.is_connected() {
255            tracing::error!("cannot send request {req:?}, not connected");
256            return Err(ChannelError::disconnected());
257        }
258
259        match self.try_talk(req) {
260            Ok(r) => {
261                tracing::trace!("talk {r:?}");
262                Ok(r)
263            }
264            Err(ChannelError::Disconnected { cause }) => {
265                self.handle_disconnect(self.generation);
266                Err(ChannelError::Disconnected { cause })
267            }
268            e => e,
269        }
270    }
271
272    pub(crate) fn command(&mut self, req: Request) -> Result<(), ChannelError> {
273        debug_assert!(!req.expect_response());
274
275        tracing::trace!("command {req:?}");
276
277        if req.must_be_connected() && !self.is_connected() {
278            tracing::error!("cannot send request {req:?}, not connected");
279            return Err(ChannelError::disconnected());
280        }
281
282        match self.request_sender.send(req) {
283            Ok(_) => {
284                tracing::trace!("command ok");
285                Ok(())
286            }
287            Err(ChannelError::Disconnected { cause }) => {
288                self.handle_disconnect(self.generation);
289                Err(ChannelError::Disconnected { cause })
290            }
291            e => e,
292        }
293    }
294
295    fn spawn_view_process(
296        view_process_exe: &Path,
297        view_process_env: &HashMap<Txt, Txt>,
298        headless: bool,
299    ) -> AnyResult<(
300        Option<std::process::Child>,
301        ipc::RequestSender,
302        ipc::ResponseReceiver,
303        ipc::EventReceiver,
304    )> {
305        let _span = tracing::trace_span!("spawn_view_process").entered();
306
307        let init = ipc::AppInit::new();
308
309        // create process and spawn it, unless is running in same process mode.
310        let process = if ViewConfig::is_awaiting_same_process() {
311            ViewConfig::set_same_process(ViewConfig {
312                version: crate::VERSION.into(),
313                server_name: Txt::from_str(init.name()),
314                headless,
315            });
316            None
317        } else {
318            #[cfg(not(ipc))]
319            {
320                let _ = (view_process_exe, view_process_env);
321                panic!("expected only same_process mode with `ipc` feature disabled");
322            }
323
324            #[cfg(ipc)]
325            {
326                let mut process = std::process::Command::new(view_process_exe);
327                for (name, val) in view_process_env {
328                    process.env(name, val);
329                }
330                let process = process
331                    .env(VIEW_VERSION, crate::VERSION)
332                    .env(VIEW_SERVER, init.name())
333                    .env(VIEW_MODE, if headless { "headless" } else { "headed" })
334                    .env("RUST_BACKTRACE", "full")
335                    .spawn()?;
336                Some(process)
337            }
338        };
339
340        let (req, rsp, ev) = match init.connect() {
341            Ok(r) => r,
342            Err(e) => {
343                #[cfg(ipc)]
344                if let Some(mut p) = process {
345                    if let Err(ke) = p.kill() {
346                        tracing::error!(
347                            "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
348                        );
349                    } else {
350                        match p.wait() {
351                            Ok(output) => {
352                                let code = output.code();
353                                if ViewConfig::is_version_err(code, None) {
354                                    let code = code.unwrap_or(1);
355                                    tracing::error!(
356                                        "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
357                                                will exit app-process with code 0x{code:x}"
358                                    );
359                                    zng_env::exit(code);
360                                } else {
361                                    tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
362                                }
363                            }
364                            Err(e) => {
365                                tracing::error!("failed to read output status of killed view-process, {e}");
366                            }
367                        }
368                    }
369                } else {
370                    tracing::error!("failed to connect with same process");
371                }
372                return Err(e);
373            }
374        };
375
376        Ok((process, req, rsp, ev))
377    }
378
379    /// Handle an [`Event::Inited`].
380    ///
381    /// Set the connected flag to `true`.
382    pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
383        match self.view_state {
384            ViewState::NotRunning => {
385                if self.generation == vp_gen {
386                    // crash respawn already sets gen
387                    self.view_state = ViewState::RunningAndConnected;
388                }
389            }
390            ViewState::Suspended => {
391                self.generation = vp_gen;
392                self.view_state = ViewState::RunningAndConnected;
393            }
394            ViewState::RunningAndConnected => {}
395        }
396    }
397
398    /// Handle an [`Event::Suspended`].
399    ///
400    /// Set the connected flat to `false`.
401    pub fn handle_suspended(&mut self) {
402        self.view_state = ViewState::Suspended;
403    }
404
405    /// Handle an [`Event::Disconnected`].
406    ///
407    /// The `gen` parameter is the generation provided by the event. It is used to determinate if the disconnect has
408    /// not been handled already.
409    ///
410    /// Tries to cleanup the old view-process and start a new one, if all is successful an [`Event::Inited`] is send.
411    ///
412    /// The old view-process exit code and std output is logged using the `vp_respawn` target.
413    ///
414    /// Exits the current process with code `1` if the view-process was killed by the user. In Windows this is if
415    /// the view-process exit code is `1`. In Unix if it was killed by SIGKILL, SIGSTOP, SIGINT.
416    ///
417    /// # Panics
418    ///
419    /// If the last five respawns happened all within 500ms of the previous respawn.
420    ///
421    /// If the an error happens three times when trying to spawn the new view-process.
422    ///
423    /// If another disconnect happens during the view-process startup dialog.
424    pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
425        if vp_gen == self.generation {
426            #[cfg(not(ipc))]
427            {
428                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
429            }
430
431            #[cfg(ipc)]
432            {
433                self.respawn_impl(true)
434            }
435        } else {
436            tracing::warn!("disconnected event from previous generation ignored")
437        }
438    }
439
440    /// Reopen the view-process, causing another [`Event::Inited`].
441    ///
442    /// This is similar to [`handle_disconnect`] but the current process does not
443    /// exit depending on the view-process exit code.
444    ///
445    /// [`handle_disconnect`]: Controller::handle_disconnect
446    pub fn respawn(&mut self) {
447        #[cfg(not(ipc))]
448        {
449            tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
450        }
451
452        #[cfg(ipc)]
453        self.respawn_impl(false);
454    }
455    #[cfg(ipc)]
456    fn respawn_impl(&mut self, is_crash: bool) {
457        use zng_unit::TimeUnits;
458
459        self.view_state = ViewState::NotRunning;
460        self.is_respawn = true;
461
462        let (mut process, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
463            p
464        } else {
465            if self.same_process {
466                tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
467            }
468            return;
469        };
470        if is_crash {
471            tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
472        }
473
474        if is_crash {
475            let t = Instant::now();
476            if let Some(last_respawn) = self.last_respawn {
477                if t - last_respawn < Duration::from_secs(60) {
478                    self.fast_respawn_count += 1;
479                    if self.fast_respawn_count == 2 {
480                        panic!("disconnect respawn happened 2 times in less than 1 minute");
481                    }
482                } else {
483                    self.fast_respawn_count = 0;
484                }
485            }
486            self.last_respawn = Some(t);
487        } else {
488            self.last_respawn = None;
489        }
490
491        // try exit
492        if !is_crash {
493            let _ = process.kill();
494            killed_by_us = true;
495        } else if !matches!(process.try_wait(), Ok(Some(_))) {
496            // if not exited, give the process 300ms to close with the preferred exit code.
497            thread::sleep(300.ms());
498
499            if !matches!(process.try_wait(), Ok(Some(_))) {
500                // if still not exited, kill it.
501                killed_by_us = true;
502                let _ = process.kill();
503            }
504        }
505
506        let exit_status = match process.wait() {
507            Ok(c) => Some(c),
508            Err(e) => {
509                tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
510                None
511            }
512        };
513
514        // try print stdout/err and exit code.
515        if let Some(c) = exit_status {
516            tracing::info!(target: "vp_respawn", "view-process killed");
517
518            let code = c.code();
519            #[allow(unused_mut)]
520            let mut signal = None::<i32>;
521
522            if !killed_by_us {
523                // check if user killed the view-process, in this case we exit too.
524
525                #[cfg(windows)]
526                if code == Some(1) {
527                    tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
528                                        will exit app-process with the same code");
529                    zng_env::exit(1);
530                }
531
532                #[cfg(unix)]
533                if code.is_none() {
534                    use std::os::unix::process::ExitStatusExt as _;
535                    signal = c.signal();
536
537                    if let Some(sig) = signal
538                        && [2, 9, 17, 19, 23].contains(&sig)
539                    {
540                        tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
541                                            will exit app-process with code 1");
542                        zng_env::exit(1);
543                    }
544                }
545            }
546
547            if !killed_by_us {
548                let code = code.unwrap_or(0);
549                let signal = signal.unwrap_or(0);
550                tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
551            }
552
553            if ViewConfig::is_version_err(code, None) {
554                let code = code.unwrap_or(1);
555                tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
556                                        will exit app-process with code 0x{code:x}");
557                zng_env::exit(code);
558            }
559        } else {
560            tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
561        }
562
563        // recover event listener closure (in a box).
564        let on_event = match self.event_listener.take().unwrap().join() {
565            Ok(fn_) => fn_,
566            Err(p) => panic::resume_unwind(p),
567        };
568
569        // respawn
570        let mut retries = 3;
571        let (new_process, request, response, event_listener) = loop {
572            match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
573                Ok(r) => break r,
574                Err(e) => {
575                    tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
576                    retries -= 1;
577                    if retries == 0 {
578                        panic!("failed to respawn `view-process` after 3 retries");
579                    }
580                    tracing::info!(target: "vp_respawn", "retrying respawn");
581                }
582            }
583        };
584
585        // update connections
586        self.process = Arc::new(Mutex::new(Some((new_process.unwrap(), false))));
587        self.request_sender = request;
588        self.response_receiver = response;
589
590        let next_id = self.generation.next();
591        self.generation = next_id;
592
593        let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
594        self.event_listener = Some(ev);
595
596        if let Err(ChannelError::Disconnected { .. }) = self.try_init() {
597            panic!("respawn on respawn startup");
598        }
599    }
600}
601impl Drop for Controller {
602    /// Kills the View Process, unless it is running in the same process.
603    fn drop(&mut self) {
604        let _ = self.exit();
605        #[cfg(ipc)]
606        if let Some((mut process, _)) = self.process.lock().take()
607            && process.try_wait().is_err()
608        {
609            std::thread::sleep(Duration::from_secs(1));
610            if process.try_wait().is_err() {
611                tracing::error!("view-process did not exit after 1s, killing");
612                let _ = process.kill();
613                let _ = process.wait();
614            }
615        }
616    }
617}
618
619const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
620/// Timeout in seconds.
621pub(crate) fn view_timeout() -> u64 {
622    match std::env::var(VIEW_TIMEOUT) {
623        Ok(s) if !s.is_empty() => match s.parse::<u64>() {
624            Ok(s) => s.max(5),
625            Err(e) => {
626                if s == "false" {
627                    return u64::MAX;
628                }
629                tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
630                10
631            }
632        },
633        _ => 10,
634    }
635}