zng_task/channel.rs
1//! Async channels.
2//!
3//! The channel can work across UI tasks and parallel tasks, it can be [`bounded`] or [`unbounded`] and is MPMC.
4//!
5//! This module is a thin wrapper around the [`flume`] crate's channel that just limits the API
6//! surface to only `async` methods. You can convert from/into that [`flume`] channel.
7//!
8//! # Examples
9//!
10//! ```no_run
11//! use zng_task::{self as task, channel};
12//! # use zng_unit::*;
13//!
14//! let (sender, receiver) = channel::bounded(5);
15//!
16//! task::spawn(async move {
17//! task::deadline(5.secs()).await;
18//! if let Err(e) = sender.send("Data!").await {
19//! eprintln!("no receiver connected, did not send message: '{}'", e.0)
20//! }
21//! });
22//! task::spawn(async move {
23//! match receiver.recv().await {
24//! Ok(msg) => println!("{msg}"),
25//! Err(_) => eprintln!("no message in channel and no sender connected")
26//! }
27//! });
28//! ```
29//!
30//! [`flume`]: https://docs.rs/flume/0.10.7/flume/
31
32use std::{convert::TryFrom, fmt};
33
34pub use flume::{RecvError, RecvTimeoutError, SendError, SendTimeoutError};
35
36use zng_time::Deadline;
37
38/// The transmitting end of an unbounded channel.
39///
40/// Use [`unbounded`] to create a channel.
41pub struct UnboundSender<T>(flume::Sender<T>);
42impl<T> fmt::Debug for UnboundSender<T> {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 write!(f, "UnboundSender<{}>", pretty_type_name::pretty_type_name::<T>())
45 }
46}
47impl<T> Clone for UnboundSender<T> {
48 fn clone(&self) -> Self {
49 UnboundSender(self.0.clone())
50 }
51}
52impl<T> TryFrom<flume::Sender<T>> for UnboundSender<T> {
53 type Error = flume::Sender<T>;
54
55 /// Convert to [`UnboundSender`] if the flume sender is unbound.
56 fn try_from(value: flume::Sender<T>) -> Result<Self, Self::Error> {
57 if value.capacity().is_none() {
58 Ok(UnboundSender(value))
59 } else {
60 Err(value)
61 }
62 }
63}
64impl<T> From<UnboundSender<T>> for flume::Sender<T> {
65 fn from(s: UnboundSender<T>) -> Self {
66 s.0
67 }
68}
69impl<T> UnboundSender<T> {
70 /// Send a value into the channel.
71 ///
72 /// If the messages are not received they accumulate in the channel buffer.
73 ///
74 /// Returns an error if all receivers have been dropped.
75 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
76 self.0.send(msg)
77 }
78
79 /// Returns `true` if all receivers for this channel have been dropped.
80 pub fn is_disconnected(&self) -> bool {
81 self.0.is_disconnected()
82 }
83
84 /// Returns `true` if the channel is empty.
85 pub fn is_empty(&self) -> bool {
86 self.0.is_empty()
87 }
88
89 /// Returns the number of messages in the channel.
90 pub fn len(&self) -> usize {
91 self.0.len()
92 }
93}
94
95/// The transmitting end of a channel.
96///
97/// Use [`bounded`] or [`rendezvous`] to create a channel. You can also convert an [`UnboundSender`] into this one.
98pub struct Sender<T>(flume::Sender<T>);
99impl<T> fmt::Debug for Sender<T> {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
102 }
103}
104impl<T> Clone for Sender<T> {
105 fn clone(&self) -> Self {
106 Sender(self.0.clone())
107 }
108}
109impl<T> From<flume::Sender<T>> for Sender<T> {
110 fn from(s: flume::Sender<T>) -> Self {
111 Sender(s)
112 }
113}
114impl<T> From<Sender<T>> for flume::Sender<T> {
115 fn from(s: Sender<T>) -> Self {
116 s.0
117 }
118}
119impl<T> Sender<T> {
120 /// Send a value into the channel.
121 ///
122 /// Waits until there is space in the channel buffer.
123 ///
124 /// Returns an error if all receivers have been dropped.
125 pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
126 self.0.send_async(msg).await
127 }
128
129 /// Send a value into the channel.
130 ///
131 /// Waits until there is space in the channel buffer or the `deadline` is reached.
132 ///
133 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
134 pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), SendTimeoutError<Option<T>>> {
135 match super::with_deadline(self.send(msg), deadline).await {
136 Ok(r) => match r {
137 Ok(_) => Ok(()),
138 Err(e) => Err(SendTimeoutError::Disconnected(Some(e.0))),
139 },
140 Err(_) => Err(SendTimeoutError::Timeout(None)),
141 }
142 }
143
144 /// Returns `true` if all receivers for this channel have been dropped.
145 pub fn is_disconnected(&self) -> bool {
146 self.0.is_disconnected()
147 }
148
149 /// Returns `true` if the channel is empty.
150 ///
151 /// Note: [`rendezvous`] channels are always empty.
152 pub fn is_empty(&self) -> bool {
153 self.0.is_empty()
154 }
155
156 /// Returns `true` if the channel is full.
157 ///
158 /// Note: [`rendezvous`] channels are always full and [`unbounded`] channels are never full.
159 pub fn is_full(&self) -> bool {
160 self.0.is_full()
161 }
162
163 /// Returns the number of messages in the channel.
164 pub fn len(&self) -> usize {
165 self.0.len()
166 }
167
168 /// If the channel is bounded, returns its capacity.
169 pub fn capacity(&self) -> Option<usize> {
170 self.0.capacity()
171 }
172}
173
174/// The receiving end of a channel.
175///
176/// Use [`bounded`],[`unbounded`] or [`rendezvous`] to create a channel.
177///
178/// # Work Stealing
179///
180/// Cloning the receiver **does not** turn this channel into a broadcast channel.
181/// Each message will only be received by a single receiver. You can use this to
182/// to implement work stealing.
183pub struct Receiver<T>(flume::Receiver<T>);
184impl<T> fmt::Debug for Receiver<T> {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
187 }
188}
189impl<T> Clone for Receiver<T> {
190 fn clone(&self) -> Self {
191 Receiver(self.0.clone())
192 }
193}
194impl<T> Receiver<T> {
195 /// Wait for an incoming value from the channel associated with this receiver.
196 ///
197 /// Returns an error if all senders have been dropped.
198 pub async fn recv(&self) -> Result<T, RecvError> {
199 self.0.recv_async().await
200 }
201
202 /// Wait for an incoming value from the channel associated with this receiver.
203 ///
204 /// Returns an error if all senders have been dropped or the `deadline` is reached.
205 pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, RecvTimeoutError> {
206 match super::with_deadline(self.recv(), deadline).await {
207 Ok(r) => match r {
208 Ok(m) => Ok(m),
209 Err(_) => Err(RecvTimeoutError::Disconnected),
210 },
211 Err(_) => Err(RecvTimeoutError::Timeout),
212 }
213 }
214
215 /// Returns `true` if all senders for this channel have been dropped.
216 pub fn is_disconnected(&self) -> bool {
217 self.0.is_disconnected()
218 }
219
220 /// Returns `true` if the channel is empty.
221 ///
222 /// Note: [`rendezvous`] channels are always empty.
223 pub fn is_empty(&self) -> bool {
224 self.0.is_empty()
225 }
226
227 /// Returns `true` if the channel is full.
228 ///
229 /// Note: [`rendezvous`] channels are always full and [`unbounded`] channels are never full.
230 pub fn is_full(&self) -> bool {
231 self.0.is_full()
232 }
233
234 /// Returns the number of messages in the channel.
235 pub fn len(&self) -> usize {
236 self.0.len()
237 }
238
239 /// If the channel is bounded, returns its capacity.
240 pub fn capacity(&self) -> Option<usize> {
241 self.0.capacity()
242 }
243
244 /// Takes all sitting in the channel.
245 pub fn drain(&self) -> flume::Drain<T> {
246 self.0.drain()
247 }
248}
249
250/// Create a channel with no maximum capacity.
251///
252/// Unbound channels always [`send`] messages immediately, never yielding on await.
253/// If the messages are no [received] they accumulate in the channel buffer.
254///
255/// # Examples
256///
257/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
258/// rapidly consumes all messages in the buffer and new messages as they are send.
259///
260/// ```no_run
261/// use zng_task::{self as task, channel};
262/// # use zng_unit::*;
263///
264/// let (sender, receiver) = channel::unbounded();
265///
266/// task::spawn(async move {
267/// for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
268/// task::deadline(300.ms()).await;
269/// if let Err(e) = sender.send(msg) {
270/// eprintln!("no receiver connected, the message `{}` was not send", e.0);
271/// break;
272/// }
273/// }
274/// });
275/// task::spawn(async move {
276/// task::deadline(5.secs()).await;
277///
278/// loop {
279/// match receiver.recv().await {
280/// Ok(msg) => println!("{msg}"),
281/// Err(_) => {
282/// eprintln!("no message in channel and no sender connected");
283/// break;
284/// }
285/// }
286/// }
287/// });
288/// ```
289///
290/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
291///
292/// [`send`]: UnboundSender::send
293/// [received]: Receiver::recv
294/// [spawns]: crate::spawn
295pub fn unbounded<T>() -> (UnboundSender<T>, Receiver<T>) {
296 let (s, r) = flume::unbounded();
297 (UnboundSender(s), Receiver(r))
298}
299
300/// Create a channel with a maximum capacity.
301///
302/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
303/// is [received] before sending another message.
304///
305/// # Examples
306///
307/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
308/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
309///
310/// ```no_run
311/// use zng_task::{self as task, channel};
312/// # use zng_unit::*;
313///
314/// let (sender, receiver) = channel::bounded(2);
315///
316/// task::spawn(async move {
317/// for msg in ["Hello!", "Data!"].into_iter().cycle() {
318/// task::deadline(300.ms()).await;
319/// if let Err(e) = sender.send(msg).await {
320/// eprintln!("no receiver connected, the message `{}` was not send", e.0);
321/// break;
322/// }
323/// }
324/// });
325/// task::spawn(async move {
326/// task::deadline(5.secs()).await;
327///
328/// loop {
329/// match receiver.recv().await {
330/// Ok(msg) => println!("{msg}"),
331/// Err(_) => {
332/// eprintln!("no message in channel and no sender connected");
333/// break;
334/// }
335/// }
336/// }
337/// });
338/// ```
339///
340/// [`send`]: UnboundSender::send
341/// [received]: Receiver::recv
342/// [spawns]: crate::spawn
343pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
344 let (s, r) = flume::bounded(capacity);
345 (Sender(s), Receiver(r))
346}
347
348/// Create a [`bounded`] channel with `0` capacity.
349///
350/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
351///
352/// # Examples
353///
354/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
355/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
356///
357/// ```no_run
358/// use zng_task::{self as task, channel};
359/// # use zng_unit::*;
360/// # use std::time::*;
361/// # use zng_time::*;
362///
363/// let (sender, receiver) = channel::rendezvous();
364///
365/// task::spawn(async move {
366/// loop {
367/// let t = INSTANT.now();
368///
369/// if let Err(e) = sender.send("the stuff").await {
370/// eprintln!(r#"failed to send "{}", no receiver connected"#, e.0);
371/// break;
372/// }
373///
374/// assert!(t.elapsed() >= 2.secs());
375/// }
376/// });
377/// task::spawn(async move {
378/// loop {
379/// task::deadline(2.secs()).await;
380///
381/// match receiver.recv().await {
382/// Ok(msg) => println!(r#"got "{msg}""#),
383/// Err(_) => {
384/// eprintln!("no sender connected");
385/// break;
386/// }
387/// }
388/// }
389/// });
390/// ```
391///
392/// [`send`]: UnboundSender::send
393/// [received]: Receiver::recv
394/// [spawns]: crate::spawn
395pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
396 bounded::<T>(0)
397}