zng_app/event/
channel.rs
1use std::time::Duration;
2
3use crate::{AppEventSender, update::UpdatesTrace};
4
5use super::*;
6
7pub(crate) struct EventUpdateMsg {
8 args: Box<dyn FnOnce() -> EventUpdate + Send>,
9}
10impl EventUpdateMsg {
11 pub(crate) fn get(self) -> EventUpdate {
12 (self.args)()
13 }
14}
15impl fmt::Debug for EventUpdateMsg {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 f.debug_struct("EventUpdateMsg").finish_non_exhaustive()
18 }
19}
20
21pub struct EventSender<A>
25where
26 A: EventArgs + Send,
27{
28 pub(super) sender: AppEventSender,
29 pub(super) event: Event<A>,
30}
31impl<A> Clone for EventSender<A>
32where
33 A: EventArgs + Send,
34{
35 fn clone(&self) -> Self {
36 EventSender {
37 sender: self.sender.clone(),
38 event: self.event,
39 }
40 }
41}
42impl<A> fmt::Debug for EventSender<A>
43where
44 A: EventArgs + Send,
45{
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 write!(f, "EventSender({:?})", &self.event)
48 }
49}
50impl<A> EventSender<A>
51where
52 A: EventArgs + Send,
53{
54 pub fn send(&self, args: A) -> Result<(), AppDisconnected<A>> {
56 UpdatesTrace::log_event(self.event.as_any());
57
58 let event = self.event;
59 let msg = EventUpdateMsg {
60 args: Box::new(move || event.new_update(args)),
61 };
62
63 self.sender.send_event(msg).map_err(|e| {
64 if let Some(args) = (e.0.args)().args.as_any().downcast_ref::<A>() {
65 AppDisconnected(args.clone())
66 } else {
67 unreachable!()
68 }
69 })
70 }
71
72 pub fn event(&self) -> Event<A> {
74 self.event
75 }
76}
77
78#[must_use = "stops receiving on drop"]
82pub struct EventReceiver<A>
83where
84 A: EventArgs + Send,
85{
86 pub(super) event: Event<A>,
87 pub(super) receiver: flume::Receiver<A>,
88}
89impl<A> Clone for EventReceiver<A>
90where
91 A: EventArgs + Send,
92{
93 fn clone(&self) -> Self {
94 EventReceiver {
95 event: self.event,
96 receiver: self.receiver.clone(),
97 }
98 }
99}
100impl<A> fmt::Debug for EventReceiver<A>
101where
102 A: EventArgs + Send,
103{
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 write!(f, "EventSender({:?})", &self.event)
106 }
107}
108impl<A> EventReceiver<A>
109where
110 A: EventArgs + Send,
111{
112 pub fn recv(&self) -> Result<A, AppDisconnected<()>> {
116 self.receiver.recv().map_err(|_| AppDisconnected(()))
117 }
118
119 pub fn try_recv(&self) -> Result<A, Option<AppDisconnected<()>>> {
123 self.receiver.try_recv().map_err(|e| match e {
124 flume::TryRecvError::Empty => None,
125 flume::TryRecvError::Disconnected => Some(AppDisconnected(())),
126 })
127 }
128
129 pub fn recv_deadline(&self, deadline: Instant) -> Result<A, TimeoutOrAppDisconnected> {
131 self.receiver.recv_deadline(deadline).map_err(TimeoutOrAppDisconnected::from)
132 }
133
134 pub fn recv_timeout(&self, dur: Duration) -> Result<A, TimeoutOrAppDisconnected> {
136 self.receiver.recv_timeout(dur).map_err(TimeoutOrAppDisconnected::from)
137 }
138
139 pub async fn recv_async(&self) -> Result<A, AppDisconnected<()>> {
141 RecvFut::from(self.receiver.recv_async()).await
142 }
143
144 pub fn into_recv_async(self) -> impl Future<Output = Result<A, AppDisconnected<()>>> + Send + Sync + 'static {
146 RecvFut::from(self.receiver.into_recv_async())
147 }
148
149 pub fn iter(&self) -> flume::Iter<A> {
152 self.receiver.iter()
153 }
154
155 pub fn try_iter(&self) -> flume::TryIter<A> {
158 self.receiver.try_iter()
159 }
160
161 pub fn event(&self) -> Event<A> {
163 self.event
164 }
165}
166impl<A> From<EventReceiver<A>> for flume::Receiver<A>
167where
168 A: EventArgs + Send,
169{
170 fn from(e: EventReceiver<A>) -> Self {
171 e.receiver
172 }
173}
174impl<'a, A> IntoIterator for &'a EventReceiver<A>
175where
176 A: EventArgs + Send,
177{
178 type Item = A;
179
180 type IntoIter = flume::Iter<'a, A>;
181
182 fn into_iter(self) -> Self::IntoIter {
183 self.receiver.iter()
184 }
185}
186impl<A> IntoIterator for EventReceiver<A>
187where
188 A: EventArgs + Send,
189{
190 type Item = A;
191
192 type IntoIter = flume::IntoIter<A>;
193
194 fn into_iter(self) -> Self::IntoIter {
195 self.receiver.into_iter()
196 }
197}
198
199pub struct AppDisconnected<T>(pub T);
203impl From<flume::RecvError> for AppDisconnected<()> {
204 fn from(_: flume::RecvError) -> Self {
205 AppDisconnected(())
206 }
207}
208impl<T> From<flume::SendError<T>> for AppDisconnected<T> {
209 fn from(e: flume::SendError<T>) -> Self {
210 AppDisconnected(e.0)
211 }
212}
213impl<T> fmt::Debug for AppDisconnected<T> {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "AppDisconnected<{}>", pretty_type_name::pretty_type_name::<T>())
216 }
217}
218impl<T> fmt::Display for AppDisconnected<T> {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 write!(f, "cannot send/receive because the app has disconnected")
221 }
222}
223impl<T> std::error::Error for AppDisconnected<T> {}
224
225pub enum TimeoutOrAppDisconnected {
227 Timeout,
229 AppDisconnected,
231}
232impl From<flume::RecvTimeoutError> for TimeoutOrAppDisconnected {
233 fn from(e: flume::RecvTimeoutError) -> Self {
234 match e {
235 flume::RecvTimeoutError::Timeout => TimeoutOrAppDisconnected::Timeout,
236 flume::RecvTimeoutError::Disconnected => TimeoutOrAppDisconnected::AppDisconnected,
237 }
238 }
239}
240impl fmt::Debug for TimeoutOrAppDisconnected {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 if f.alternate() {
243 write!(f, "TimeoutOrAppDisconnected::")?;
244 }
245 match self {
246 TimeoutOrAppDisconnected::Timeout => write!(f, "Timeout"),
247 TimeoutOrAppDisconnected::AppDisconnected => write!(f, "AppDisconnected"),
248 }
249 }
250}
251impl fmt::Display for TimeoutOrAppDisconnected {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 match self {
254 TimeoutOrAppDisconnected::Timeout => write!(f, "failed send, timeout"),
255 TimeoutOrAppDisconnected::AppDisconnected => write!(f, "cannot send because the app has disconnected"),
256 }
257 }
258}
259impl std::error::Error for TimeoutOrAppDisconnected {}
260
261struct RecvFut<'a, M>(flume::r#async::RecvFut<'a, M>);
263impl<'a, M> From<flume::r#async::RecvFut<'a, M>> for RecvFut<'a, M> {
264 fn from(f: flume::r#async::RecvFut<'a, M>) -> Self {
265 Self(f)
266 }
267}
268impl<M> Future for RecvFut<'_, M> {
269 type Output = Result<M, AppDisconnected<()>>;
270
271 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
272 match std::pin::Pin::new(&mut self.0).poll(cx) {
273 std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map_err(|_| AppDisconnected(()))),
274 std::task::Poll::Pending => std::task::Poll::Pending,
275 }
276 }
277}