zng_app/event/
channel.rs

1use std::time::Duration;
2
3use crate::{AppEventSender, update::UpdatesTrace};
4
5use super::*;
6
7pub(crate) struct EventUpdateMsg {
8    args: Box<dyn FnOnce() -> EventUpdate + Send>,
9}
10impl EventUpdateMsg {
11    pub(crate) fn get(self) -> EventUpdate {
12        (self.args)()
13    }
14}
15impl fmt::Debug for EventUpdateMsg {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        f.debug_struct("EventUpdateMsg").finish_non_exhaustive()
18    }
19}
20
21/// An event update sender that can be used from any thread and without access to [`EVENTS`].
22///
23/// Use [`Event::sender`] to create a sender.
24pub struct EventSender<A>
25where
26    A: EventArgs + Send,
27{
28    pub(super) sender: AppEventSender,
29    pub(super) event: Event<A>,
30}
31impl<A> Clone for EventSender<A>
32where
33    A: EventArgs + Send,
34{
35    fn clone(&self) -> Self {
36        EventSender {
37            sender: self.sender.clone(),
38            event: self.event,
39        }
40    }
41}
42impl<A> fmt::Debug for EventSender<A>
43where
44    A: EventArgs + Send,
45{
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        write!(f, "EventSender({:?})", &self.event)
48    }
49}
50impl<A> EventSender<A>
51where
52    A: EventArgs + Send,
53{
54    /// Send an event update.
55    pub fn send(&self, args: A) -> Result<(), AppDisconnected<A>> {
56        UpdatesTrace::log_event(self.event.as_any());
57
58        let event = self.event;
59        let msg = EventUpdateMsg {
60            args: Box::new(move || event.new_update(args)),
61        };
62
63        self.sender.send_event(msg).map_err(|e| {
64            if let Some(args) = (e.0.args)().args.as_any().downcast_ref::<A>() {
65                AppDisconnected(args.clone())
66            } else {
67                unreachable!()
68            }
69        })
70    }
71
72    /// Event that receives from this sender.
73    pub fn event(&self) -> Event<A> {
74        self.event
75    }
76}
77
78/// An event channel receiver.
79///
80/// Use [`Event::receiver`] to create a receiver, drop to stop listening.
81#[must_use = "stops receiving on drop"]
82pub struct EventReceiver<A>
83where
84    A: EventArgs + Send,
85{
86    pub(super) event: Event<A>,
87    pub(super) receiver: flume::Receiver<A>,
88}
89impl<A> Clone for EventReceiver<A>
90where
91    A: EventArgs + Send,
92{
93    fn clone(&self) -> Self {
94        EventReceiver {
95            event: self.event,
96            receiver: self.receiver.clone(),
97        }
98    }
99}
100impl<A> fmt::Debug for EventReceiver<A>
101where
102    A: EventArgs + Send,
103{
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        write!(f, "EventSender({:?})", &self.event)
106    }
107}
108impl<A> EventReceiver<A>
109where
110    A: EventArgs + Send,
111{
112    /// Receives the oldest update, blocks until the event updates.
113    ///
114    /// Note that *oldest* here refers to send order (FIFO), not the args creation timestamp.
115    pub fn recv(&self) -> Result<A, AppDisconnected<()>> {
116        self.receiver.recv().map_err(|_| AppDisconnected(()))
117    }
118
119    /// Tries to receive the oldest sent update not received, returns `Ok(args)` if there was at least
120    /// one update, or returns `Err(None)` if there was no update or returns `Err(AppDisconnected)` if the connected
121    /// app has exited.
122    pub fn try_recv(&self) -> Result<A, Option<AppDisconnected<()>>> {
123        self.receiver.try_recv().map_err(|e| match e {
124            flume::TryRecvError::Empty => None,
125            flume::TryRecvError::Disconnected => Some(AppDisconnected(())),
126        })
127    }
128
129    /// Receives the oldest send update, blocks until the event updates or until the `deadline` is reached.
130    pub fn recv_deadline(&self, deadline: Instant) -> Result<A, TimeoutOrAppDisconnected> {
131        self.receiver.recv_deadline(deadline).map_err(TimeoutOrAppDisconnected::from)
132    }
133
134    /// Receives the oldest send update, blocks until the event updates or until timeout.
135    pub fn recv_timeout(&self, dur: Duration) -> Result<A, TimeoutOrAppDisconnected> {
136        self.receiver.recv_timeout(dur).map_err(TimeoutOrAppDisconnected::from)
137    }
138
139    /// Returns a future that receives the oldest send update, awaits until an event update occurs.
140    pub async fn recv_async(&self) -> Result<A, AppDisconnected<()>> {
141        RecvFut::from(self.receiver.recv_async()).await
142    }
143
144    /// Turns into a future that receives the oldest send update, awaits until an event update occurs.
145    pub fn into_recv_async(self) -> impl Future<Output = Result<A, AppDisconnected<()>>> + Send + Sync + 'static {
146        RecvFut::from(self.receiver.into_recv_async())
147    }
148
149    /// Creates a blocking iterator over event updates, if there are no updates sent the iterator blocks,
150    /// the iterator only finishes when the app shuts-down.
151    pub fn iter(&self) -> flume::Iter<A> {
152        self.receiver.iter()
153    }
154
155    /// Create a non-blocking iterator over event updates, the iterator finishes if
156    /// there are no more updates sent.
157    pub fn try_iter(&self) -> flume::TryIter<A> {
158        self.receiver.try_iter()
159    }
160
161    /// Event that sends to this receiver.
162    pub fn event(&self) -> Event<A> {
163        self.event
164    }
165}
166impl<A> From<EventReceiver<A>> for flume::Receiver<A>
167where
168    A: EventArgs + Send,
169{
170    fn from(e: EventReceiver<A>) -> Self {
171        e.receiver
172    }
173}
174impl<'a, A> IntoIterator for &'a EventReceiver<A>
175where
176    A: EventArgs + Send,
177{
178    type Item = A;
179
180    type IntoIter = flume::Iter<'a, A>;
181
182    fn into_iter(self) -> Self::IntoIter {
183        self.receiver.iter()
184    }
185}
186impl<A> IntoIterator for EventReceiver<A>
187where
188    A: EventArgs + Send,
189{
190    type Item = A;
191
192    type IntoIter = flume::IntoIter<A>;
193
194    fn into_iter(self) -> Self::IntoIter {
195        self.receiver.into_iter()
196    }
197}
198
199/// Error when the app connected to a sender/receiver channel has disconnected.
200///
201/// Contains the value that could not be send or `()` for receiver errors.
202pub struct AppDisconnected<T>(pub T);
203impl From<flume::RecvError> for AppDisconnected<()> {
204    fn from(_: flume::RecvError) -> Self {
205        AppDisconnected(())
206    }
207}
208impl<T> From<flume::SendError<T>> for AppDisconnected<T> {
209    fn from(e: flume::SendError<T>) -> Self {
210        AppDisconnected(e.0)
211    }
212}
213impl<T> fmt::Debug for AppDisconnected<T> {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "AppDisconnected<{}>", pretty_type_name::pretty_type_name::<T>())
216    }
217}
218impl<T> fmt::Display for AppDisconnected<T> {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        write!(f, "cannot send/receive because the app has disconnected")
221    }
222}
223impl<T> std::error::Error for AppDisconnected<T> {}
224
225/// Error when the app connected to a sender channel has disconnected or taken to long to respond.
226pub enum TimeoutOrAppDisconnected {
227    /// Connected app has not responded.
228    Timeout,
229    /// Connected app has disconnected.
230    AppDisconnected,
231}
232impl From<flume::RecvTimeoutError> for TimeoutOrAppDisconnected {
233    fn from(e: flume::RecvTimeoutError) -> Self {
234        match e {
235            flume::RecvTimeoutError::Timeout => TimeoutOrAppDisconnected::Timeout,
236            flume::RecvTimeoutError::Disconnected => TimeoutOrAppDisconnected::AppDisconnected,
237        }
238    }
239}
240impl fmt::Debug for TimeoutOrAppDisconnected {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        if f.alternate() {
243            write!(f, "TimeoutOrAppDisconnected::")?;
244        }
245        match self {
246            TimeoutOrAppDisconnected::Timeout => write!(f, "Timeout"),
247            TimeoutOrAppDisconnected::AppDisconnected => write!(f, "AppDisconnected"),
248        }
249    }
250}
251impl fmt::Display for TimeoutOrAppDisconnected {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        match self {
254            TimeoutOrAppDisconnected::Timeout => write!(f, "failed send, timeout"),
255            TimeoutOrAppDisconnected::AppDisconnected => write!(f, "cannot send because the app has disconnected"),
256        }
257    }
258}
259impl std::error::Error for TimeoutOrAppDisconnected {}
260
261/// A future that receives a single message from a running app.
262struct RecvFut<'a, M>(flume::r#async::RecvFut<'a, M>);
263impl<'a, M> From<flume::r#async::RecvFut<'a, M>> for RecvFut<'a, M> {
264    fn from(f: flume::r#async::RecvFut<'a, M>) -> Self {
265        Self(f)
266    }
267}
268impl<M> Future for RecvFut<'_, M> {
269    type Output = Result<M, AppDisconnected<()>>;
270
271    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
272        match std::pin::Pin::new(&mut self.0).poll(cx) {
273            std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map_err(|_| AppDisconnected(()))),
274            std::task::Poll::Pending => std::task::Poll::Pending,
275        }
276    }
277}