1use std::{
2 collections::HashMap,
3 panic,
4 path::{Path, PathBuf},
5 sync::Arc,
6 thread::{self, JoinHandle},
7 time::Instant,
8};
9
10use std::time::Duration;
11
12use parking_lot::Mutex;
13use zng_task::channel::ChannelError;
14use zng_txt::Txt;
15
16use crate::{
17 AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, VpResult,
18 ipc::{self, EventReceiver},
19};
20
21type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
23
24pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
25pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
26pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
27
28#[derive(Clone, Copy)]
29enum ViewState {
30 NotRunning,
31 RunningAndConnected,
32 Suspended,
33}
34
35#[cfg_attr(not(ipc), allow(unused))]
48pub struct Controller {
49 process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
50 view_state: ViewState,
51 generation: ViewProcessGen,
52 is_respawn: bool,
53 view_process_exe: PathBuf,
54 view_process_env: HashMap<Txt, Txt>,
55 request_sender: ipc::RequestSender,
56 response_receiver: ipc::ResponseReceiver,
57 event_listener: Option<EventListenerJoin>,
58 headless: bool,
59 same_process: bool,
60 last_respawn: Option<Instant>,
61 fast_respawn_count: u8,
62}
63#[cfg(test)]
64fn _assert_sync(x: Controller) -> impl Send + Sync {
65 x
66}
67impl Controller {
68 pub fn start<F>(view_process_exe: PathBuf, view_process_env: HashMap<Txt, Txt>, headless: bool, on_event: F) -> Self
94 where
95 F: FnMut(Event) + Send + 'static,
96 {
97 Self::start_impl(view_process_exe, view_process_env, headless, Box::new(on_event))
98 }
99 fn start_impl(
100 view_process_exe: PathBuf,
101 view_process_env: HashMap<Txt, Txt>,
102 headless: bool,
103 on_event: Box<dyn FnMut(Event) + Send>,
104 ) -> Self {
105 if ViewConfig::from_env().is_some() {
106 panic!("cannot start Controller in process configured to be view-process");
107 }
108
109 let (process, request_sender, response_receiver, event_receiver) =
110 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
111 let same_process = process.is_none();
112 let process = Arc::new(Mutex::new(process.map(|p| (p, false))));
113 let ev = if same_process {
114 Self::spawn_same_process_listener(on_event, event_receiver, ViewProcessGen::first())
115 } else {
116 Self::spawn_other_process_listener(on_event, event_receiver, process.clone(), ViewProcessGen::first())
117 };
118
119 let mut c = Controller {
120 same_process,
121 view_state: ViewState::NotRunning,
122 process,
123 view_process_exe,
124 view_process_env,
125 request_sender,
126 response_receiver,
127 event_listener: Some(ev),
128 headless,
129 generation: ViewProcessGen::INVALID,
130 is_respawn: false,
131 last_respawn: None,
132 fast_respawn_count: 0,
133 };
134
135 if let Err(ChannelError::Disconnected { .. }) = c.try_init() {
136 panic!("respawn on init");
137 }
138
139 c
140 }
141 fn spawn_same_process_listener(
142 mut on_event: Box<dyn FnMut(Event) + Send>,
143 mut event_receiver: EventReceiver,
144 generation: ViewProcessGen,
145 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
146 thread::Builder::new()
147 .name("same_process_listener".into())
148 .spawn(move || {
149 while let Ok(ev) = event_receiver.recv() {
150 on_event(ev);
151 }
152 on_event(Event::Disconnected(generation));
153
154 on_event
156 })
157 .expect("failed to spawn thread")
158 }
159 fn spawn_other_process_listener(
160 mut on_event: Box<dyn FnMut(Event) + Send>,
161 mut event_receiver: EventReceiver,
162 process: Arc<Mutex<Option<(std::process::Child, bool)>>>,
163 generation: ViewProcessGen,
164 ) -> std::thread::JoinHandle<Box<dyn FnMut(Event) + Send>> {
165 thread::Builder::new()
169 .name("other_process_listener".into())
170 .spawn(move || {
171 const PROCESS_CHECK_DUR: Duration = Duration::from_secs(1);
172 let timeout = view_timeout();
173 let mut check_count = 0u64;
174 loop {
175 match event_receiver.recv_timeout(PROCESS_CHECK_DUR) {
176 Ok(ev) => {
177 check_count = 0;
178 on_event(ev)
179 }
180 Err(ChannelError::Timeout) => {
181 if let Some(p) = &mut *process.lock() {
182 match p.0.try_wait() {
183 Ok(c) => {
184 if c.is_some() {
185 break;
187 } else {
188 check_count += 1;
189 if check_count == timeout {
190 tracing::error!("view-process not responding for {timeout}s, will respawn");
191 let _ = p.0.kill();
192 p.1 = true;
193 break;
194 }
195 }
196 }
197 Err(e) => {
198 if e.kind() != std::io::ErrorKind::Interrupted {
199 tracing::error!("view-process try_wait error after inactivity, {e}");
200 break;
201 }
202 }
203 }
204 } else {
205 break;
207 }
208 }
209 Err(_) => break,
210 }
211 }
212 on_event(Event::Disconnected(generation));
213
214 on_event
216 })
217 .expect("failed to spawn thread")
218 }
219
220 fn try_init(&mut self) -> VpResult<()> {
221 self.init(self.generation, self.is_respawn, self.headless)?;
222 Ok(())
223 }
224
225 pub fn is_connected(&self) -> bool {
227 matches!(self.view_state, ViewState::RunningAndConnected)
228 }
229
230 pub fn generation(&self) -> ViewProcessGen {
232 self.generation
233 }
234
235 pub fn headless(&self) -> bool {
237 self.headless
238 }
239
240 pub fn same_process(&self) -> bool {
242 self.same_process
243 }
244
245 fn try_talk(&mut self, req: Request) -> Result<Response, ChannelError> {
246 self.request_sender.send(req)?;
247 self.response_receiver.recv()
248 }
249 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
250 debug_assert!(req.expect_response());
251
252 tracing::trace!("talk {req:?}");
253
254 if req.must_be_connected() && !self.is_connected() {
255 tracing::error!("cannot send request {req:?}, not connected");
256 return Err(ChannelError::disconnected());
257 }
258
259 match self.try_talk(req) {
260 Ok(r) => {
261 tracing::trace!("talk {r:?}");
262 Ok(r)
263 }
264 Err(ChannelError::Disconnected { cause }) => {
265 self.handle_disconnect(self.generation);
266 Err(ChannelError::Disconnected { cause })
267 }
268 e => e,
269 }
270 }
271
272 pub(crate) fn command(&mut self, req: Request) -> Result<(), ChannelError> {
273 debug_assert!(!req.expect_response());
274
275 tracing::trace!("command {req:?}");
276
277 if req.must_be_connected() && !self.is_connected() {
278 tracing::error!("cannot send request {req:?}, not connected");
279 return Err(ChannelError::disconnected());
280 }
281
282 match self.request_sender.send(req) {
283 Ok(_) => {
284 tracing::trace!("command ok");
285 Ok(())
286 }
287 Err(ChannelError::Disconnected { cause }) => {
288 self.handle_disconnect(self.generation);
289 Err(ChannelError::Disconnected { cause })
290 }
291 e => e,
292 }
293 }
294
295 fn spawn_view_process(
296 view_process_exe: &Path,
297 view_process_env: &HashMap<Txt, Txt>,
298 headless: bool,
299 ) -> AnyResult<(
300 Option<std::process::Child>,
301 ipc::RequestSender,
302 ipc::ResponseReceiver,
303 ipc::EventReceiver,
304 )> {
305 let _span = tracing::trace_span!("spawn_view_process").entered();
306
307 let init = ipc::AppInit::new();
308
309 let process = if ViewConfig::is_awaiting_same_process() {
311 ViewConfig::set_same_process(ViewConfig {
312 version: crate::VERSION.into(),
313 server_name: Txt::from_str(init.name()),
314 headless,
315 });
316 None
317 } else {
318 #[cfg(not(ipc))]
319 {
320 let _ = (view_process_exe, view_process_env);
321 panic!("expected only same_process mode with `ipc` feature disabled");
322 }
323
324 #[cfg(ipc)]
325 {
326 let mut process = std::process::Command::new(view_process_exe);
327 for (name, val) in view_process_env {
328 process.env(name, val);
329 }
330 let process = process
331 .env(VIEW_VERSION, crate::VERSION)
332 .env(VIEW_SERVER, init.name())
333 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
334 .env("RUST_BACKTRACE", "full")
335 .spawn()?;
336 Some(process)
337 }
338 };
339
340 let (req, rsp, ev) = match init.connect() {
341 Ok(r) => r,
342 Err(e) => {
343 #[cfg(ipc)]
344 if let Some(mut p) = process {
345 if let Err(ke) = p.kill() {
346 tracing::error!(
347 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
348 );
349 } else {
350 match p.wait() {
351 Ok(output) => {
352 let code = output.code();
353 if ViewConfig::is_version_err(code, None) {
354 let code = code.unwrap_or(1);
355 tracing::error!(
356 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
357 will exit app-process with code 0x{code:x}"
358 );
359 zng_env::exit(code);
360 } else {
361 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
362 }
363 }
364 Err(e) => {
365 tracing::error!("failed to read output status of killed view-process, {e}");
366 }
367 }
368 }
369 } else {
370 tracing::error!("failed to connect with same process");
371 }
372 return Err(e);
373 }
374 };
375
376 Ok((process, req, rsp, ev))
377 }
378
379 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
383 match self.view_state {
384 ViewState::NotRunning => {
385 if self.generation == vp_gen {
386 self.view_state = ViewState::RunningAndConnected;
388 }
389 }
390 ViewState::Suspended => {
391 self.generation = vp_gen;
392 self.view_state = ViewState::RunningAndConnected;
393 }
394 ViewState::RunningAndConnected => {}
395 }
396 }
397
398 pub fn handle_suspended(&mut self) {
402 self.view_state = ViewState::Suspended;
403 }
404
405 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
425 if vp_gen == self.generation {
426 #[cfg(not(ipc))]
427 {
428 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
429 }
430
431 #[cfg(ipc)]
432 {
433 self.respawn_impl(true)
434 }
435 } else {
436 tracing::warn!("disconnected event from previous generation ignored")
437 }
438 }
439
440 pub fn respawn(&mut self) {
447 #[cfg(not(ipc))]
448 {
449 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
450 }
451
452 #[cfg(ipc)]
453 self.respawn_impl(false);
454 }
455 #[cfg(ipc)]
456 fn respawn_impl(&mut self, is_crash: bool) {
457 use zng_unit::TimeUnits;
458
459 self.view_state = ViewState::NotRunning;
460 self.is_respawn = true;
461
462 let (mut process, mut killed_by_us) = if let Some(p) = self.process.lock().take() {
463 p
464 } else {
465 if self.same_process {
466 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
467 }
468 return;
469 };
470 if is_crash {
471 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
472 }
473
474 if is_crash {
475 let t = Instant::now();
476 if let Some(last_respawn) = self.last_respawn {
477 if t - last_respawn < Duration::from_secs(60) {
478 self.fast_respawn_count += 1;
479 if self.fast_respawn_count == 2 {
480 panic!("disconnect respawn happened 2 times in less than 1 minute");
481 }
482 } else {
483 self.fast_respawn_count = 0;
484 }
485 }
486 self.last_respawn = Some(t);
487 } else {
488 self.last_respawn = None;
489 }
490
491 if !is_crash {
493 let _ = process.kill();
494 killed_by_us = true;
495 } else if !matches!(process.try_wait(), Ok(Some(_))) {
496 thread::sleep(300.ms());
498
499 if !matches!(process.try_wait(), Ok(Some(_))) {
500 killed_by_us = true;
502 let _ = process.kill();
503 }
504 }
505
506 let exit_status = match process.wait() {
507 Ok(c) => Some(c),
508 Err(e) => {
509 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
510 None
511 }
512 };
513
514 if let Some(c) = exit_status {
516 tracing::info!(target: "vp_respawn", "view-process killed");
517
518 let code = c.code();
519 #[allow(unused_mut)]
520 let mut signal = None::<i32>;
521
522 if !killed_by_us {
523 #[cfg(windows)]
526 if code == Some(1) {
527 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
528 will exit app-process with the same code");
529 zng_env::exit(1);
530 }
531
532 #[cfg(unix)]
533 if code.is_none() {
534 use std::os::unix::process::ExitStatusExt as _;
535 signal = c.signal();
536
537 if let Some(sig) = signal
538 && [2, 9, 17, 19, 23].contains(&sig)
539 {
540 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
541 will exit app-process with code 1");
542 zng_env::exit(1);
543 }
544 }
545 }
546
547 if !killed_by_us {
548 let code = code.unwrap_or(0);
549 let signal = signal.unwrap_or(0);
550 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
551 }
552
553 if ViewConfig::is_version_err(code, None) {
554 let code = code.unwrap_or(1);
555 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
556 will exit app-process with code 0x{code:x}");
557 zng_env::exit(code);
558 }
559 } else {
560 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
561 }
562
563 let on_event = match self.event_listener.take().unwrap().join() {
565 Ok(fn_) => fn_,
566 Err(p) => panic::resume_unwind(p),
567 };
568
569 let mut retries = 3;
571 let (new_process, request, response, event_listener) = loop {
572 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
573 Ok(r) => break r,
574 Err(e) => {
575 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
576 retries -= 1;
577 if retries == 0 {
578 panic!("failed to respawn `view-process` after 3 retries");
579 }
580 tracing::info!(target: "vp_respawn", "retrying respawn");
581 }
582 }
583 };
584
585 self.process = Arc::new(Mutex::new(Some((new_process.unwrap(), false))));
587 self.request_sender = request;
588 self.response_receiver = response;
589
590 let next_id = self.generation.next();
591 self.generation = next_id;
592
593 let ev = Self::spawn_other_process_listener(on_event, event_listener, self.process.clone(), self.generation);
594 self.event_listener = Some(ev);
595
596 if let Err(ChannelError::Disconnected { .. }) = self.try_init() {
597 panic!("respawn on respawn startup");
598 }
599 }
600}
601impl Drop for Controller {
602 fn drop(&mut self) {
604 let _ = self.exit();
605 #[cfg(ipc)]
606 if let Some((mut process, _)) = self.process.lock().take()
607 && process.try_wait().is_err()
608 {
609 std::thread::sleep(Duration::from_secs(1));
610 if process.try_wait().is_err() {
611 tracing::error!("view-process did not exit after 1s, killing");
612 let _ = process.kill();
613 let _ = process.wait();
614 }
615 }
616 }
617}
618
619const VIEW_TIMEOUT: &str = "ZNG_VIEW_TIMEOUT";
620pub(crate) fn view_timeout() -> u64 {
622 match std::env::var(VIEW_TIMEOUT) {
623 Ok(s) if !s.is_empty() => match s.parse::<u64>() {
624 Ok(s) => s.max(5),
625 Err(e) => {
626 if s == "false" {
627 return u64::MAX;
628 }
629 tracing::error!("invalid {VIEW_TIMEOUT:?} value, {e}");
630 10
631 }
632 },
633 _ => 10,
634 }
635}