1use std::time::Duration;
2
3use crate::{AppChannelError, AppEventSender, update::UpdatesTrace};
4
5use super::*;
6
7pub(crate) struct EventUpdateMsg {
8 args: Box<dyn FnOnce() -> EventUpdate + Send>,
9}
10impl EventUpdateMsg {
11 pub(crate) fn get(self) -> EventUpdate {
12 (self.args)()
13 }
14}
15impl fmt::Debug for EventUpdateMsg {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 f.debug_struct("EventUpdateMsg").finish_non_exhaustive()
18 }
19}
20
21pub struct EventSender<A>
25where
26 A: EventArgs + Send,
27{
28 pub(super) sender: AppEventSender,
29 pub(super) event: Event<A>,
30}
31impl<A> Clone for EventSender<A>
32where
33 A: EventArgs + Send,
34{
35 fn clone(&self) -> Self {
36 EventSender {
37 sender: self.sender.clone(),
38 event: self.event,
39 }
40 }
41}
42impl<A> fmt::Debug for EventSender<A>
43where
44 A: EventArgs + Send,
45{
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 write!(f, "EventSender({:?})", &self.event)
48 }
49}
50impl<A> EventSender<A>
51where
52 A: EventArgs + Send,
53{
54 pub fn send(&self, args: A) -> Result<(), AppChannelError> {
56 UpdatesTrace::log_event(self.event.as_any());
57
58 let event = self.event;
59 let msg = EventUpdateMsg {
60 args: Box::new(move || event.new_update(args)),
61 };
62
63 self.sender.send_event(msg).map_err(|_| AppChannelError::Disconnected)
64 }
65
66 pub fn event(&self) -> Event<A> {
68 self.event
69 }
70}
71
72#[must_use = "stops receiving on drop"]
76pub struct EventReceiver<A>
77where
78 A: EventArgs + Send,
79{
80 pub(super) event: Event<A>,
81 pub(super) receiver: flume::Receiver<A>,
82}
83impl<A> Clone for EventReceiver<A>
84where
85 A: EventArgs + Send,
86{
87 fn clone(&self) -> Self {
88 EventReceiver {
89 event: self.event,
90 receiver: self.receiver.clone(),
91 }
92 }
93}
94impl<A> fmt::Debug for EventReceiver<A>
95where
96 A: EventArgs + Send,
97{
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(f, "EventSender({:?})", &self.event)
100 }
101}
102impl<A> EventReceiver<A>
103where
104 A: EventArgs + Send,
105{
106 pub fn recv(&self) -> Result<A, AppChannelError> {
110 self.receiver.recv().map_err(|_| AppChannelError::Disconnected)
111 }
112
113 pub fn try_recv(&self) -> Result<Option<A>, AppChannelError> {
117 match self.receiver.try_recv() {
118 Ok(a) => Ok(Some(a)),
119 Err(e) => match e {
120 flume::TryRecvError::Empty => Ok(None),
121 flume::TryRecvError::Disconnected => Err(AppChannelError::Disconnected),
122 },
123 }
124 }
125
126 pub fn recv_deadline(&self, deadline: Instant) -> Result<A, AppChannelError> {
128 self.receiver.recv_deadline(deadline).map_err(AppChannelError::from)
129 }
130
131 pub fn recv_timeout(&self, dur: Duration) -> Result<A, AppChannelError> {
133 self.receiver.recv_timeout(dur).map_err(AppChannelError::from)
134 }
135
136 pub async fn recv_async(&self) -> Result<A, AppChannelError> {
138 RecvFut::from(self.receiver.recv_async()).await
139 }
140
141 pub fn into_recv_async(self) -> impl Future<Output = Result<A, AppChannelError>> + Send + Sync + 'static {
143 RecvFut::from(self.receiver.into_recv_async())
144 }
145
146 pub fn iter(&self) -> flume::Iter<'_, A> {
149 self.receiver.iter()
150 }
151
152 pub fn try_iter(&self) -> flume::TryIter<'_, A> {
155 self.receiver.try_iter()
156 }
157
158 pub fn event(&self) -> Event<A> {
160 self.event
161 }
162}
163impl<A> From<EventReceiver<A>> for flume::Receiver<A>
164where
165 A: EventArgs + Send,
166{
167 fn from(e: EventReceiver<A>) -> Self {
168 e.receiver
169 }
170}
171impl<'a, A> IntoIterator for &'a EventReceiver<A>
172where
173 A: EventArgs + Send,
174{
175 type Item = A;
176
177 type IntoIter = flume::Iter<'a, A>;
178
179 fn into_iter(self) -> Self::IntoIter {
180 self.receiver.iter()
181 }
182}
183impl<A> IntoIterator for EventReceiver<A>
184where
185 A: EventArgs + Send,
186{
187 type Item = A;
188
189 type IntoIter = flume::IntoIter<A>;
190
191 fn into_iter(self) -> Self::IntoIter {
192 self.receiver.into_iter()
193 }
194}
195
196struct RecvFut<'a, M>(flume::r#async::RecvFut<'a, M>);
198impl<'a, M> From<flume::r#async::RecvFut<'a, M>> for RecvFut<'a, M> {
199 fn from(f: flume::r#async::RecvFut<'a, M>) -> Self {
200 Self(f)
201 }
202}
203impl<M> Future for RecvFut<'_, M> {
204 type Output = Result<M, AppChannelError>;
205
206 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
207 match std::pin::Pin::new(&mut self.0).poll(cx) {
208 std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map_err(|_| AppChannelError::Disconnected)),
209 std::task::Poll::Pending => std::task::Poll::Pending,
210 }
211 }
212}