1use std::{
2 collections::HashMap,
3 panic,
4 path::{Path, PathBuf},
5 thread::{self, JoinHandle},
6 time::Instant,
7};
8
9#[cfg(ipc)]
10use std::time::Duration;
11
12use zng_txt::Txt;
13
14use crate::{AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, ViewProcessOffline, VpResult, ipc};
15
16type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
18
19pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
20pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
21pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
22
23#[derive(Clone, Copy)]
24enum ViewState {
25 Offline,
26 Online,
27 Suspended,
28}
29
30#[cfg_attr(not(ipc), allow(unused))]
40pub struct Controller {
41 process: Option<std::process::Child>,
42 view_state: ViewState,
43 generation: ViewProcessGen,
44 is_respawn: bool,
45 view_process_exe: PathBuf,
46 view_process_env: HashMap<Txt, Txt>,
47 request_sender: ipc::RequestSender,
48 response_receiver: ipc::ResponseReceiver,
49 event_listener: Option<EventListenerJoin>,
50 headless: bool,
51 same_process: bool,
52 device_events: bool,
53 last_respawn: Option<Instant>,
54 fast_respawn_count: u8,
55}
56#[cfg(test)]
57fn _assert_sync(x: Controller) -> impl Send + Sync {
58 x
59}
60impl Controller {
61 pub fn start<F>(
80 view_process_exe: PathBuf,
81 view_process_env: HashMap<Txt, Txt>,
82 device_events: bool,
83 headless: bool,
84 on_event: F,
85 ) -> Self
86 where
87 F: FnMut(Event) + Send + 'static,
88 {
89 Self::start_impl(view_process_exe, view_process_env, device_events, headless, Box::new(on_event))
90 }
91 fn start_impl(
92 view_process_exe: PathBuf,
93 view_process_env: HashMap<Txt, Txt>,
94 device_events: bool,
95 headless: bool,
96 mut on_event: Box<dyn FnMut(Event) + Send>,
97 ) -> Self {
98 if ViewConfig::from_env().is_some() {
99 panic!("cannot start Controller in process configured to be view-process");
100 }
101
102 let (process, request_sender, response_receiver, mut event_receiver) =
103 Self::spawn_view_process(&view_process_exe, &view_process_env, headless).expect("failed to spawn or connect to view-process");
104
105 let ev = thread::spawn(move || {
106 while let Ok(ev) = event_receiver.recv() {
107 on_event(ev);
108 }
109 on_event(Event::Disconnected(ViewProcessGen::first()));
110
111 on_event
113 });
114
115 let mut c = Controller {
116 same_process: process.is_none(),
117 view_state: ViewState::Offline,
118 process,
119 view_process_exe,
120 view_process_env,
121 request_sender,
122 response_receiver,
123 event_listener: Some(ev),
124 headless,
125 device_events,
126 generation: ViewProcessGen::first(),
127 is_respawn: false,
128 last_respawn: None,
129 fast_respawn_count: 0,
130 };
131
132 if let Err(ViewProcessOffline) = c.try_init() {
133 panic!("respawn on init");
134 }
135
136 c
137 }
138
139 fn try_init(&mut self) -> VpResult<()> {
140 self.init(self.generation, self.is_respawn, self.device_events, self.headless)?;
141 Ok(())
142 }
143
144 pub fn online(&self) -> bool {
146 matches!(self.view_state, ViewState::Online)
147 }
148
149 pub fn generation(&self) -> ViewProcessGen {
151 self.generation
152 }
153
154 pub fn headless(&self) -> bool {
156 self.headless
157 }
158
159 pub fn device_events(&self) -> bool {
161 self.device_events
162 }
163
164 pub fn same_process(&self) -> bool {
166 self.same_process
167 }
168
169 fn offline_err(&self) -> Result<(), ViewProcessOffline> {
170 if self.online() { Ok(()) } else { Err(ViewProcessOffline) }
171 }
172
173 fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
174 self.request_sender.send(req)?;
175 self.response_receiver.recv()
176 }
177 pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
178 debug_assert!(req.expect_response());
179
180 if req.must_be_online() {
181 self.offline_err()?;
182 }
183
184 match self.try_talk(req) {
185 Ok(r) => Ok(r),
186 Err(ipc::Disconnected) => {
187 self.handle_disconnect(self.generation);
188 Err(ViewProcessOffline)
189 }
190 }
191 }
192
193 pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
194 debug_assert!(!req.expect_response());
195
196 if req.must_be_online() {
197 self.offline_err()?;
198 }
199
200 match self.request_sender.send(req) {
201 Ok(_) => Ok(()),
202 Err(ipc::Disconnected) => {
203 self.handle_disconnect(self.generation);
204 Err(ViewProcessOffline)
205 }
206 }
207 }
208
209 fn spawn_view_process(
210 view_process_exe: &Path,
211 view_process_env: &HashMap<Txt, Txt>,
212 headless: bool,
213 ) -> AnyResult<(
214 Option<std::process::Child>,
215 ipc::RequestSender,
216 ipc::ResponseReceiver,
217 ipc::EventReceiver,
218 )> {
219 let _span = tracing::trace_span!("spawn_view_process").entered();
220
221 let init = ipc::AppInit::new();
222
223 let process = if ViewConfig::is_awaiting_same_process() {
225 ViewConfig::set_same_process(ViewConfig {
226 version: crate::VERSION.into(),
227 server_name: Txt::from_str(init.name()),
228 headless,
229 });
230 None
231 } else {
232 #[cfg(not(ipc))]
233 {
234 let _ = (view_process_exe, view_process_env);
235 panic!("expected only same_process mode with `ipc` feature disabled");
236 }
237
238 #[cfg(ipc)]
239 {
240 let mut process = std::process::Command::new(view_process_exe);
241 for (name, val) in view_process_env {
242 process.env(name, val);
243 }
244 let process = process
245 .env(VIEW_VERSION, crate::VERSION)
246 .env(VIEW_SERVER, init.name())
247 .env(VIEW_MODE, if headless { "headless" } else { "headed" })
248 .env("RUST_BACKTRACE", "full")
249 .spawn()?;
250 Some(process)
251 }
252 };
253
254 let (req, rsp, ev) = match init.connect() {
255 Ok(r) => r,
256 Err(e) => {
257 #[cfg(ipc)]
258 if let Some(mut p) = process {
259 if let Err(ke) = p.kill() {
260 tracing::error!(
261 "failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
262 );
263 } else {
264 match p.wait() {
265 Ok(output) => {
266 let code = output.code();
267 if ViewConfig::is_version_err(code, None) {
268 let code = code.unwrap_or(1);
269 tracing::error!(
270 "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
271 will exit app-process with code 0x{code:x}"
272 );
273 zng_env::exit(code);
274 } else {
275 tracing::error!("view-process exit code: {}", output.code().unwrap_or(1));
276 }
277 }
278 Err(e) => {
279 tracing::error!("failed to read output status of killed view-process, {e}");
280 }
281 }
282 }
283 } else {
284 tracing::error!("failed to connect with same process");
285 }
286 return Err(e);
287 }
288 };
289
290 Ok((process, req, rsp, ev))
291 }
292
293 pub fn handle_inited(&mut self, vp_gen: ViewProcessGen) {
297 match self.view_state {
298 ViewState::Offline => {
299 if self.generation == vp_gen {
300 self.view_state = ViewState::Online;
302 }
303 }
304 ViewState::Suspended => {
305 self.generation = vp_gen;
306 self.view_state = ViewState::Online;
307 }
308 ViewState::Online => {}
309 }
310 }
311
312 pub fn handle_suspended(&mut self) {
316 self.view_state = ViewState::Suspended;
317 }
318
319 pub fn handle_disconnect(&mut self, vp_gen: ViewProcessGen) {
339 if vp_gen == self.generation {
340 #[cfg(not(ipc))]
341 {
342 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
343 }
344
345 #[cfg(ipc)]
346 {
347 self.respawn_impl(true)
348 }
349 }
350 }
351
352 pub fn respawn(&mut self) {
359 #[cfg(not(ipc))]
360 {
361 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
362 }
363
364 #[cfg(ipc)]
365 self.respawn_impl(false);
366 }
367 #[cfg(ipc)]
368 fn respawn_impl(&mut self, is_crash: bool) {
369 use zng_unit::TimeUnits;
370
371 self.view_state = ViewState::Offline;
372 self.is_respawn = true;
373
374 let mut process = if let Some(p) = self.process.take() {
375 p
376 } else {
377 if self.same_process {
378 tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
379 }
380 return;
381 };
382 if is_crash {
383 tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
384 }
385
386 if is_crash {
387 let t = Instant::now();
388 if let Some(last_respawn) = self.last_respawn {
389 if t - last_respawn < Duration::from_secs(60) {
390 self.fast_respawn_count += 1;
391 if self.fast_respawn_count == 2 {
392 panic!("disconnect respawn happened 2 times less than 1 minute apart");
393 }
394 } else {
395 self.fast_respawn_count = 0;
396 }
397 }
398 self.last_respawn = Some(t);
399 } else {
400 self.last_respawn = None;
401 }
402
403 let mut killed_by_us = false;
405 if !is_crash {
406 let _ = process.kill();
407 killed_by_us = true;
408 } else if !matches!(process.try_wait(), Ok(Some(_))) {
409 thread::sleep(300.ms());
411
412 if !matches!(process.try_wait(), Ok(Some(_))) {
413 killed_by_us = true;
415 let _ = process.kill();
416 }
417 }
418
419 let code_and_output = match process.wait() {
420 Ok(c) => Some(c),
421 Err(e) => {
422 tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
423 None
424 }
425 };
426
427 if let Some(c) = code_and_output {
429 tracing::info!(target: "vp_respawn", "view-process killed");
430
431 let code = c.code();
432 #[allow(unused_mut)]
433 let mut signal = None::<i32>;
434
435 if !killed_by_us {
436 #[cfg(windows)]
439 if code == Some(1) {
440 tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
441 will exit app-process with the same code");
442 zng_env::exit(1);
443 }
444
445 #[cfg(unix)]
446 if code.is_none() {
447 use std::os::unix::process::ExitStatusExt as _;
448 signal = c.signal();
449
450 if let Some(sig) = signal {
451 if [2, 9, 17, 19, 23].contains(&sig) {
452 tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
453 will exit app-process with code 1");
454 zng_env::exit(1);
455 }
456 }
457 }
458 }
459
460 if !killed_by_us {
461 let code = code.unwrap_or(0);
462 let signal = signal.unwrap_or(0);
463 tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
464 }
465
466 if ViewConfig::is_version_err(code, None) {
467 let code = code.unwrap_or(1);
468 tracing::error!(target: "vp_respawn", "view-process API version mismatch, the view-process build must use the same exact version as the app-process, \
469 will exit app-process with code 0x{code:x}");
470 zng_env::exit(code);
471 }
472 } else {
473 tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
474 }
475
476 let mut on_event = match self.event_listener.take().unwrap().join() {
478 Ok(fn_) => fn_,
479 Err(p) => panic::resume_unwind(p),
480 };
481
482 let mut retries = 3;
484 let (new_process, request, response, mut event) = loop {
485 match Self::spawn_view_process(&self.view_process_exe, &self.view_process_env, self.headless) {
486 Ok(r) => break r,
487 Err(e) => {
488 tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
489 retries -= 1;
490 if retries == 0 {
491 panic!("failed to respawn `view-process` after 3 retries");
492 }
493 tracing::info!(target: "vp_respawn", "retrying respawn");
494 }
495 }
496 };
497
498 self.process = new_process;
500 self.request_sender = request;
501 self.response_receiver = response;
502
503 let next_id = self.generation.next();
504 self.generation = next_id;
505
506 if let Err(ViewProcessOffline) = self.try_init() {
507 panic!("respawn on respawn startup");
508 }
509
510 let ev = thread::spawn(move || {
511 while let Ok(ev) = event.recv() {
512 on_event(ev);
513 }
514 on_event(Event::Disconnected(next_id));
515
516 on_event
517 });
518 self.event_listener = Some(ev);
519 }
520}
521impl Drop for Controller {
522 fn drop(&mut self) {
524 let _ = self.exit();
525 #[cfg(ipc)]
526 if let Some(mut process) = self.process.take() {
527 if process.try_wait().is_err() {
528 std::thread::sleep(Duration::from_secs(1));
529 if process.try_wait().is_err() {
530 tracing::error!("view-process did not exit after 1s, killing");
531 let _ = process.kill();
532 let _ = process.wait();
533 }
534 }
535 }
536 }
537}