zng_view_api/
ipc.rs

1//! IPC types.
2
3use std::time::Duration;
4
5use crate::{AnyResult, Event, Request, Response};
6
7use parking_lot::Mutex;
8use zng_task::channel::{self, ChannelError, IpcReceiver, IpcSender};
9use zng_txt::Txt;
10
11type AppInitMsg = (
12    channel::IpcReceiver<Request>,
13    channel::IpcSender<Response>,
14    channel::IpcSender<Event>,
15);
16
17/// Call `new`, then spawn the view-process using the `name` then call `connect`.
18pub(crate) struct AppInit {
19    init_sender: channel::NamedIpcSender<AppInitMsg>,
20}
21impl AppInit {
22    pub fn new() -> Self {
23        AppInit {
24            init_sender: channel::NamedIpcSender::new().expect("failed to create init channel"),
25        }
26    }
27
28    /// Unique name for the view-process to find this channel.
29    pub fn name(&self) -> &str {
30        self.init_sender.name()
31    }
32
33    /// Tries to connect to the view-process and receive the actual channels.
34    pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
35        let mut init_sender = self
36            .init_sender
37            .connect_deadline_blocking(std::time::Duration::from_secs(crate::view_timeout()))?;
38
39        let (req_sender, req_recv) = channel::ipc_unbounded()?;
40        let (rsp_sender, rsp_recv) = channel::ipc_unbounded()?;
41        let (evt_sender, evt_recv) = channel::ipc_unbounded()?;
42        init_sender.send_blocking((req_recv, rsp_sender, evt_sender))?;
43        Ok((
44            RequestSender(Mutex::new(req_sender)),
45            ResponseReceiver(Mutex::new(rsp_recv)),
46            EventReceiver(Mutex::new(evt_recv)),
47        ))
48    }
49}
50
51/// Start the view-process server and waits for `(request, response, event)`.
52pub fn connect_view_process(ipc_sender_name: Txt) -> Result<ViewChannels, channel::ChannelError> {
53    let _s = tracing::trace_span!("connect_view_process").entered();
54
55    let mut init_recv = channel::IpcReceiver::<AppInitMsg>::connect(ipc_sender_name)?;
56
57    let (req_recv, rsp_sender, evt_sender) = init_recv.recv_deadline_blocking(std::time::Duration::from_secs(crate::view_timeout()))?;
58
59    Ok(ViewChannels {
60        request_receiver: RequestReceiver(Mutex::new(req_recv)),
61        response_sender: ResponseSender(Mutex::new(rsp_sender)),
62        event_sender: EventSender(Mutex::new(evt_sender)),
63    })
64}
65
66/// Channels that must be used for implementing a view-process.
67pub struct ViewChannels {
68    /// View implementers must receive requests from this channel, call [`Api::respond`] and then
69    /// return the response using the `response_sender`.
70    ///
71    /// [`Api::respond`]: crate::Api::respond
72    pub request_receiver: RequestReceiver,
73
74    /// View implementers must synchronously send one response per request received in `request_receiver`.
75    pub response_sender: ResponseSender,
76
77    /// View implements must send events using this channel. Events can be asynchronous.
78    pub event_sender: EventSender,
79}
80
81type IpcResult<T> = Result<T, ChannelError>;
82
83pub(crate) struct RequestSender(Mutex<IpcSender<Request>>);
84impl RequestSender {
85    pub fn send(&mut self, req: Request) -> IpcResult<()> {
86        let r = self.0.get_mut().send_blocking(req);
87        if let Err(e) = &r {
88            tracing::debug!("request sender error, {e}");
89        }
90        r
91    }
92}
93impl Drop for RequestSender {
94    fn drop(&mut self) {
95        tracing::trace!("dropped RequestSender");
96    }
97}
98
99/// Requests channel end-point.
100///
101/// View-process implementers must receive [`Request`], call [`Api::respond`] and then use a [`ResponseSender`]
102/// to send back the response.
103///
104/// [`Api::respond`]: crate::Api::respond
105pub struct RequestReceiver(Mutex<IpcReceiver<Request>>); // Mutex for Sync
106impl RequestReceiver {
107    /// Receive one [`Request`].
108    pub fn recv(&mut self) -> IpcResult<Request> {
109        let r = self.0.get_mut().recv_blocking();
110        if let Err(e) = &r {
111            tracing::debug!("request receiver error, {e}");
112        }
113        r
114    }
115}
116impl Drop for RequestReceiver {
117    fn drop(&mut self) {
118        tracing::trace!("dropped RequestReceiver");
119    }
120}
121
122/// Responses channel entry-point.
123///
124/// View-process implementers must send [`Response`] returned by [`Api::respond`] using this sender.
125///
126/// Requests are received using [`RequestReceiver`] a response must be send for each request, synchronously.
127///
128/// [`Api::respond`]: crate::Api::respond
129pub struct ResponseSender(Mutex<IpcSender<Response>>); // Mutex for Sync
130impl ResponseSender {
131    /// Send a response.
132    ///
133    /// # Panics
134    ///
135    /// If the `rsp` is not [`must_be_send`].
136    ///
137    /// [`must_be_send`]: Response::must_be_send
138    pub fn send(&mut self, rsp: Response) -> IpcResult<()> {
139        assert!(rsp.must_be_send());
140        let r = self.0.get_mut().send_blocking(rsp);
141        if let Err(e) = &r {
142            tracing::debug!("response sender error, {e}");
143        }
144        r
145    }
146}
147impl Drop for ResponseSender {
148    fn drop(&mut self) {
149        tracing::trace!("dropped ResponseSender");
150    }
151}
152
153pub(crate) struct ResponseReceiver(Mutex<IpcReceiver<Response>>);
154impl ResponseReceiver {
155    pub fn recv(&mut self) -> IpcResult<Response> {
156        let r = self.0.get_mut().recv_blocking();
157        if let Err(e) = &r {
158            tracing::debug!("response receiver error, {e}");
159        }
160        r
161    }
162}
163impl Drop for ResponseReceiver {
164    fn drop(&mut self) {
165        tracing::trace!("dropped ResponseReceiver");
166    }
167}
168
169/// Event channel entry-point.
170///
171/// View-process implementers must send [`Event`] messages using this sender. The events
172/// can be asynchronous, not related to the [`Api::respond`] calls.
173///
174/// [`Api::respond`]: crate::Api::respond
175pub struct EventSender(Mutex<IpcSender<Event>>);
176impl EventSender {
177    /// Send an event notification.
178    pub fn send(&mut self, ev: Event) -> IpcResult<()> {
179        let r = self.0.get_mut().send_blocking(ev);
180        if let Err(e) = &r {
181            tracing::debug!("event sender error, {e}");
182        }
183        r
184    }
185}
186pub(crate) struct EventReceiver(Mutex<IpcReceiver<Event>>);
187impl EventReceiver {
188    pub fn recv(&mut self) -> IpcResult<Event> {
189        let r = self.0.get_mut().recv_blocking();
190        if let Err(e) = &r {
191            tracing::debug!("event receiver error, {e}");
192        }
193        r
194    }
195
196    pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Event> {
197        let r = self.0.get_mut().recv_deadline_blocking(duration);
198        if let Err(e) = &r {
199            match e {
200                ChannelError::Timeout => {}
201                e => tracing::debug!("event receiver error, {e}"),
202            }
203        }
204        r
205    }
206}