zng_task/
channel.rs

1//! Async channels.
2//!
3//! The channel can work across UI tasks and parallel tasks, it can be [`bounded`] or [`unbounded`] and is MPMC.
4//!
5//! This module is a thin wrapper around the [`flume`] crate's channel that just limits the API
6//! surface to only `async` methods. You can convert from/into that [`flume`] channel.
7//!
8//! # Examples
9//!
10//! ```no_run
11//! use zng_task::{self as task, channel};
12//! # use zng_unit::*;
13//!
14//! let (sender, receiver) = channel::bounded(5);
15//!
16//! task::spawn(async move {
17//!     task::deadline(5.secs()).await;
18//!     if let Err(e) = sender.send("Data!").await {
19//!         eprintln!("no receiver connected, did not send message: '{}'", e.0)
20//!     }
21//! });
22//! task::spawn(async move {
23//!     match receiver.recv().await {
24//!         Ok(msg) => println!("{msg}"),
25//!         Err(_) => eprintln!("no message in channel and no sender connected")
26//!     }
27//! });
28//! ```
29//!
30//! [`flume`]: https://docs.rs/flume/0.10.7/flume/
31
32use std::{convert::TryFrom, fmt};
33
34pub use flume::{RecvError, RecvTimeoutError, SendError, SendTimeoutError};
35
36use zng_time::Deadline;
37
38/// The transmitting end of an unbounded channel.
39///
40/// Use [`unbounded`] to create a channel.
41pub struct UnboundSender<T>(flume::Sender<T>);
42impl<T> fmt::Debug for UnboundSender<T> {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        write!(f, "UnboundSender<{}>", pretty_type_name::pretty_type_name::<T>())
45    }
46}
47impl<T> Clone for UnboundSender<T> {
48    fn clone(&self) -> Self {
49        UnboundSender(self.0.clone())
50    }
51}
52impl<T> TryFrom<flume::Sender<T>> for UnboundSender<T> {
53    type Error = flume::Sender<T>;
54
55    /// Convert to [`UnboundSender`] if the flume sender is unbound.
56    fn try_from(value: flume::Sender<T>) -> Result<Self, Self::Error> {
57        if value.capacity().is_none() {
58            Ok(UnboundSender(value))
59        } else {
60            Err(value)
61        }
62    }
63}
64impl<T> From<UnboundSender<T>> for flume::Sender<T> {
65    fn from(s: UnboundSender<T>) -> Self {
66        s.0
67    }
68}
69impl<T> UnboundSender<T> {
70    /// Send a value into the channel.
71    ///
72    /// If the messages are not received they accumulate in the channel buffer.
73    ///
74    /// Returns an error if all receivers have been dropped.
75    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
76        self.0.send(msg)
77    }
78
79    /// Returns `true` if all receivers for this channel have been dropped.
80    pub fn is_disconnected(&self) -> bool {
81        self.0.is_disconnected()
82    }
83
84    /// Returns `true` if the channel is empty.
85    pub fn is_empty(&self) -> bool {
86        self.0.is_empty()
87    }
88
89    /// Returns the number of messages in the channel.
90    pub fn len(&self) -> usize {
91        self.0.len()
92    }
93}
94
95/// The transmitting end of a channel.
96///
97/// Use [`bounded`] or [`rendezvous`] to create a channel. You can also convert an [`UnboundSender`] into this one.
98pub struct Sender<T>(flume::Sender<T>);
99impl<T> fmt::Debug for Sender<T> {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
102    }
103}
104impl<T> Clone for Sender<T> {
105    fn clone(&self) -> Self {
106        Sender(self.0.clone())
107    }
108}
109impl<T> From<flume::Sender<T>> for Sender<T> {
110    fn from(s: flume::Sender<T>) -> Self {
111        Sender(s)
112    }
113}
114impl<T> From<Sender<T>> for flume::Sender<T> {
115    fn from(s: Sender<T>) -> Self {
116        s.0
117    }
118}
119impl<T> Sender<T> {
120    /// Send a value into the channel.
121    ///
122    /// Waits until there is space in the channel buffer.
123    ///
124    /// Returns an error if all receivers have been dropped.
125    pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
126        self.0.send_async(msg).await
127    }
128
129    /// Send a value into the channel.
130    ///
131    /// Waits until there is space in the channel buffer or the `deadline` is reached.
132    ///
133    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
134    pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), SendTimeoutError<Option<T>>> {
135        match super::with_deadline(self.send(msg), deadline).await {
136            Ok(r) => match r {
137                Ok(_) => Ok(()),
138                Err(e) => Err(SendTimeoutError::Disconnected(Some(e.0))),
139            },
140            Err(_) => Err(SendTimeoutError::Timeout(None)),
141        }
142    }
143
144    /// Returns `true` if all receivers for this channel have been dropped.
145    pub fn is_disconnected(&self) -> bool {
146        self.0.is_disconnected()
147    }
148
149    /// Returns `true` if the channel is empty.
150    ///
151    /// Note: [`rendezvous`] channels are always empty.
152    pub fn is_empty(&self) -> bool {
153        self.0.is_empty()
154    }
155
156    /// Returns `true` if the channel is full.
157    ///
158    /// Note: [`rendezvous`] channels are always full and [`unbounded`] channels are never full.
159    pub fn is_full(&self) -> bool {
160        self.0.is_full()
161    }
162
163    /// Returns the number of messages in the channel.
164    pub fn len(&self) -> usize {
165        self.0.len()
166    }
167
168    /// If the channel is bounded, returns its capacity.
169    pub fn capacity(&self) -> Option<usize> {
170        self.0.capacity()
171    }
172}
173
174/// The receiving end of a channel.
175///
176/// Use [`bounded`],[`unbounded`] or [`rendezvous`] to create a channel.
177///
178/// # Work Stealing
179///
180/// Cloning the receiver **does not** turn this channel into a broadcast channel.
181/// Each message will only be received by a single receiver. You can use this to
182/// to implement work stealing.
183pub struct Receiver<T>(flume::Receiver<T>);
184impl<T> fmt::Debug for Receiver<T> {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
187    }
188}
189impl<T> Clone for Receiver<T> {
190    fn clone(&self) -> Self {
191        Receiver(self.0.clone())
192    }
193}
194impl<T> Receiver<T> {
195    /// Wait for an incoming value from the channel associated with this receiver.
196    ///
197    /// Returns an error if all senders have been dropped.
198    pub async fn recv(&self) -> Result<T, RecvError> {
199        self.0.recv_async().await
200    }
201
202    /// Wait for an incoming value from the channel associated with this receiver.
203    ///
204    /// Returns an error if all senders have been dropped or the `deadline` is reached.
205    pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, RecvTimeoutError> {
206        match super::with_deadline(self.recv(), deadline).await {
207            Ok(r) => match r {
208                Ok(m) => Ok(m),
209                Err(_) => Err(RecvTimeoutError::Disconnected),
210            },
211            Err(_) => Err(RecvTimeoutError::Timeout),
212        }
213    }
214
215    /// Returns `true` if all senders for this channel have been dropped.
216    pub fn is_disconnected(&self) -> bool {
217        self.0.is_disconnected()
218    }
219
220    /// Returns `true` if the channel is empty.
221    ///
222    /// Note: [`rendezvous`] channels are always empty.
223    pub fn is_empty(&self) -> bool {
224        self.0.is_empty()
225    }
226
227    /// Returns `true` if the channel is full.
228    ///
229    /// Note: [`rendezvous`] channels are always full and [`unbounded`] channels are never full.
230    pub fn is_full(&self) -> bool {
231        self.0.is_full()
232    }
233
234    /// Returns the number of messages in the channel.
235    pub fn len(&self) -> usize {
236        self.0.len()
237    }
238
239    /// If the channel is bounded, returns its capacity.
240    pub fn capacity(&self) -> Option<usize> {
241        self.0.capacity()
242    }
243
244    /// Takes all sitting in the channel.
245    pub fn drain(&self) -> flume::Drain<T> {
246        self.0.drain()
247    }
248}
249
250/// Create a channel with no maximum capacity.
251///
252/// Unbound channels always [`send`] messages immediately, never yielding on await.
253/// If the messages are no [received] they accumulate in the channel buffer.
254///
255/// # Examples
256///
257/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
258/// rapidly consumes all messages in the buffer and new messages as they are send.
259///
260/// ```no_run
261/// use zng_task::{self as task, channel};
262/// # use zng_unit::*;
263///
264/// let (sender, receiver) = channel::unbounded();
265///
266/// task::spawn(async move {
267///     for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
268///         task::deadline(300.ms()).await;
269///         if let Err(e) = sender.send(msg) {
270///             eprintln!("no receiver connected, the message `{}` was not send", e.0);
271///             break;
272///         }
273///     }
274/// });
275/// task::spawn(async move {
276///     task::deadline(5.secs()).await;
277///     
278///     loop {
279///         match receiver.recv().await {
280///             Ok(msg) => println!("{msg}"),
281///             Err(_) => {
282///                 eprintln!("no message in channel and no sender connected");
283///                 break;
284///             }
285///         }
286///     }
287/// });
288/// ```
289///
290/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
291///
292/// [`send`]: UnboundSender::send
293/// [received]: Receiver::recv
294/// [spawns]: crate::spawn
295pub fn unbounded<T>() -> (UnboundSender<T>, Receiver<T>) {
296    let (s, r) = flume::unbounded();
297    (UnboundSender(s), Receiver(r))
298}
299
300/// Create a channel with a maximum capacity.
301///
302/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
303/// is [received] before sending another message.
304///
305/// # Examples
306///
307/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
308/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
309///
310/// ```no_run
311/// use zng_task::{self as task, channel};
312/// # use zng_unit::*;
313///
314/// let (sender, receiver) = channel::bounded(2);
315///
316/// task::spawn(async move {
317///     for msg in ["Hello!", "Data!"].into_iter().cycle() {
318///         task::deadline(300.ms()).await;
319///         if let Err(e) = sender.send(msg).await {
320///             eprintln!("no receiver connected, the message `{}` was not send", e.0);
321///             break;
322///         }
323///     }
324/// });
325/// task::spawn(async move {
326///     task::deadline(5.secs()).await;
327///     
328///     loop {
329///         match receiver.recv().await {
330///             Ok(msg) => println!("{msg}"),
331///             Err(_) => {
332///                 eprintln!("no message in channel and no sender connected");
333///                 break;
334///             }
335///         }
336///     }
337/// });
338/// ```
339///
340/// [`send`]: UnboundSender::send
341/// [received]: Receiver::recv
342/// [spawns]: crate::spawn
343pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
344    let (s, r) = flume::bounded(capacity);
345    (Sender(s), Receiver(r))
346}
347
348/// Create a [`bounded`] channel with `0` capacity.
349///
350/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
351///
352/// # Examples
353///
354/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
355/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
356///
357/// ```no_run
358/// use zng_task::{self as task, channel};
359/// # use zng_unit::*;
360/// # use std::time::*;
361/// # use zng_time::*;
362///
363/// let (sender, receiver) = channel::rendezvous();
364///
365/// task::spawn(async move {
366///     loop {
367///         let t = INSTANT.now();
368///
369///         if let Err(e) = sender.send("the stuff").await {
370///             eprintln!(r#"failed to send "{}", no receiver connected"#, e.0);
371///             break;
372///         }
373///
374///         assert!(t.elapsed() >= 2.secs());
375///     }
376/// });
377/// task::spawn(async move {
378///     loop {
379///         task::deadline(2.secs()).await;
380///
381///         match receiver.recv().await {
382///             Ok(msg) => println!(r#"got "{msg}""#),
383///             Err(_) => {
384///                 eprintln!("no sender connected");
385///                 break;
386///             }
387///         }
388///     }
389/// });
390/// ```
391///
392/// [`send`]: UnboundSender::send
393/// [received]: Receiver::recv
394/// [spawns]: crate::spawn
395pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
396    bounded::<T>(0)
397}