zng_app/event/
channel.rs

1use std::time::Duration;
2
3use crate::{AppChannelError, AppEventSender, update::UpdatesTrace};
4
5use super::*;
6
7pub(crate) struct EventUpdateMsg {
8    args: Box<dyn FnOnce() -> EventUpdate + Send>,
9}
10impl EventUpdateMsg {
11    pub(crate) fn get(self) -> EventUpdate {
12        (self.args)()
13    }
14}
15impl fmt::Debug for EventUpdateMsg {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        f.debug_struct("EventUpdateMsg").finish_non_exhaustive()
18    }
19}
20
21/// An event update sender that can be used from any thread and without access to [`EVENTS`].
22///
23/// Use [`Event::sender`] to create a sender.
24pub struct EventSender<A>
25where
26    A: EventArgs + Send,
27{
28    pub(super) sender: AppEventSender,
29    pub(super) event: Event<A>,
30}
31impl<A> Clone for EventSender<A>
32where
33    A: EventArgs + Send,
34{
35    fn clone(&self) -> Self {
36        EventSender {
37            sender: self.sender.clone(),
38            event: self.event,
39        }
40    }
41}
42impl<A> fmt::Debug for EventSender<A>
43where
44    A: EventArgs + Send,
45{
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        write!(f, "EventSender({:?})", &self.event)
48    }
49}
50impl<A> EventSender<A>
51where
52    A: EventArgs + Send,
53{
54    /// Send an event update.
55    pub fn send(&self, args: A) -> Result<(), AppChannelError> {
56        UpdatesTrace::log_event(self.event.as_any());
57
58        let event = self.event;
59        let msg = EventUpdateMsg {
60            args: Box::new(move || event.new_update(args)),
61        };
62
63        self.sender.send_event(msg).map_err(|_| AppChannelError::Disconnected)
64    }
65
66    /// Event that receives from this sender.
67    pub fn event(&self) -> Event<A> {
68        self.event
69    }
70}
71
72/// An event channel receiver.
73///
74/// Use [`Event::receiver`] to create a receiver, drop to stop listening.
75#[must_use = "stops receiving on drop"]
76pub struct EventReceiver<A>
77where
78    A: EventArgs + Send,
79{
80    pub(super) event: Event<A>,
81    pub(super) receiver: flume::Receiver<A>,
82}
83impl<A> Clone for EventReceiver<A>
84where
85    A: EventArgs + Send,
86{
87    fn clone(&self) -> Self {
88        EventReceiver {
89            event: self.event,
90            receiver: self.receiver.clone(),
91        }
92    }
93}
94impl<A> fmt::Debug for EventReceiver<A>
95where
96    A: EventArgs + Send,
97{
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(f, "EventSender({:?})", &self.event)
100    }
101}
102impl<A> EventReceiver<A>
103where
104    A: EventArgs + Send,
105{
106    /// Receives the oldest update, blocks until the event updates.
107    ///
108    /// Note that *oldest* here refers to send order (FIFO), not the args creation timestamp.
109    pub fn recv(&self) -> Result<A, AppChannelError> {
110        self.receiver.recv().map_err(|_| AppChannelError::Disconnected)
111    }
112
113    /// Tries to receive the oldest sent update not received, returns `Ok(args)` if there was at least
114    /// one update, or returns `Err(None)` if there was no update or returns `Err(AppDisconnected)` if the connected
115    /// app has exited.
116    pub fn try_recv(&self) -> Result<Option<A>, AppChannelError> {
117        match self.receiver.try_recv() {
118            Ok(a) => Ok(Some(a)),
119            Err(e) => match e {
120                flume::TryRecvError::Empty => Ok(None),
121                flume::TryRecvError::Disconnected => Err(AppChannelError::Disconnected),
122            },
123        }
124    }
125
126    /// Receives the oldest send update, blocks until the event updates or until the `deadline` is reached.
127    pub fn recv_deadline(&self, deadline: Instant) -> Result<A, AppChannelError> {
128        self.receiver.recv_deadline(deadline).map_err(AppChannelError::from)
129    }
130
131    /// Receives the oldest send update, blocks until the event updates or until timeout.
132    pub fn recv_timeout(&self, dur: Duration) -> Result<A, AppChannelError> {
133        self.receiver.recv_timeout(dur).map_err(AppChannelError::from)
134    }
135
136    /// Returns a future that receives the oldest send update, awaits until an event update occurs.
137    pub async fn recv_async(&self) -> Result<A, AppChannelError> {
138        RecvFut::from(self.receiver.recv_async()).await
139    }
140
141    /// Turns into a future that receives the oldest send update, awaits until an event update occurs.
142    pub fn into_recv_async(self) -> impl Future<Output = Result<A, AppChannelError>> + Send + Sync + 'static {
143        RecvFut::from(self.receiver.into_recv_async())
144    }
145
146    /// Creates a blocking iterator over event updates, if there are no updates sent the iterator blocks,
147    /// the iterator only finishes when the app shuts-down.
148    pub fn iter(&self) -> flume::Iter<'_, A> {
149        self.receiver.iter()
150    }
151
152    /// Create a non-blocking iterator over event updates, the iterator finishes if
153    /// there are no more updates sent.
154    pub fn try_iter(&self) -> flume::TryIter<'_, A> {
155        self.receiver.try_iter()
156    }
157
158    /// Event that sends to this receiver.
159    pub fn event(&self) -> Event<A> {
160        self.event
161    }
162}
163impl<A> From<EventReceiver<A>> for flume::Receiver<A>
164where
165    A: EventArgs + Send,
166{
167    fn from(e: EventReceiver<A>) -> Self {
168        e.receiver
169    }
170}
171impl<'a, A> IntoIterator for &'a EventReceiver<A>
172where
173    A: EventArgs + Send,
174{
175    type Item = A;
176
177    type IntoIter = flume::Iter<'a, A>;
178
179    fn into_iter(self) -> Self::IntoIter {
180        self.receiver.iter()
181    }
182}
183impl<A> IntoIterator for EventReceiver<A>
184where
185    A: EventArgs + Send,
186{
187    type Item = A;
188
189    type IntoIter = flume::IntoIter<A>;
190
191    fn into_iter(self) -> Self::IntoIter {
192        self.receiver.into_iter()
193    }
194}
195
196/// A future that receives a single message from a running app.
197struct RecvFut<'a, M>(flume::r#async::RecvFut<'a, M>);
198impl<'a, M> From<flume::r#async::RecvFut<'a, M>> for RecvFut<'a, M> {
199    fn from(f: flume::r#async::RecvFut<'a, M>) -> Self {
200        Self(f)
201    }
202}
203impl<M> Future for RecvFut<'_, M> {
204    type Output = Result<M, AppChannelError>;
205
206    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
207        match std::pin::Pin::new(&mut self.0).poll(cx) {
208            std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map_err(|_| AppChannelError::Disconnected)),
209            std::task::Poll::Pending => std::task::Poll::Pending,
210        }
211    }
212}