zng_task/
channel.rs

1//! Communication channels.
2//!
3//! Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
4//! Use [`ipc_unbounded`] to create channels that work across processes.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zng_task::{self as task, channel};
10//! # use zng_unit::*;
11//!
12//! let (sender, receiver) = channel::bounded(5);
13//!
14//! task::spawn(async move {
15//!     task::deadline(5.secs()).await;
16//!     if let Err(e) = sender.send("Data!").await {
17//!         eprintln!("no receiver connected, did not send message: '{e}'")
18//!     }
19//! });
20//! task::spawn(async move {
21//!     match receiver.recv().await {
22//!         Ok(msg) => println!("{msg}"),
23//!         Err(_) => eprintln!("no message in channel and no sender connected"),
24//!     }
25//! });
26//! ```
27//!
28//! [`flume`]: https://docs.rs/flume
29//! [`ipc-channel`]: https://docs.rs/ipc-channel
30
31use std::{fmt, sync::Arc, time::Duration};
32
33use zng_time::{Deadline, INSTANT};
34
35mod ipc;
36pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
37
38mod ipc_bytes;
39pub use ipc_bytes::{
40    IpcBytes, IpcBytesCast, IpcBytesCastIntoIter, IpcBytesIntoIter, IpcBytesMut, IpcBytesMutCast, IpcBytesWriter, IpcBytesWriterBlocking,
41    WeakIpcBytes,
42};
43
44#[cfg(ipc)]
45pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
46use zng_txt::ToTxt;
47
48/// The transmitting end of a channel.
49///
50/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
51pub struct Sender<T>(flume::Sender<T>);
52impl<T> fmt::Debug for Sender<T> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
55    }
56}
57impl<T> Clone for Sender<T> {
58    fn clone(&self) -> Self {
59        Sender(self.0.clone())
60    }
61}
62impl<T> From<flume::Sender<T>> for Sender<T> {
63    fn from(s: flume::Sender<T>) -> Self {
64        Sender(s)
65    }
66}
67impl<T> From<Sender<T>> for flume::Sender<T> {
68    fn from(s: Sender<T>) -> Self {
69        s.0
70    }
71}
72impl<T> Sender<T> {
73    /// Send a value into the channel.
74    ///
75    /// Waits until there is space in the channel buffer.
76    ///
77    /// Returns an error if all receivers have been dropped.
78    pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
79        self.0.send_async(msg).await?;
80        Ok(())
81    }
82
83    /// Send a value into the channel.
84    ///
85    /// Waits until there is space in the channel buffer or the `deadline` is reached.
86    ///
87    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
88    pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
89        match super::with_deadline(self.send(msg), deadline).await {
90            Ok(r) => match r {
91                Ok(_) => Ok(()),
92                Err(e) => Err(e),
93            },
94            Err(_) => Err(ChannelError::Timeout),
95        }
96    }
97
98    /// Send a value into the channel.
99    ///
100    /// Blocks until there is space in the channel buffer.
101    ///
102    /// Returns an error if all receivers have been dropped.
103    pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
104        self.0.send(msg)?;
105        Ok(())
106    }
107
108    /// Send a value into the channel.
109    ///
110    /// Blocks until there is space in the channel buffer or the `deadline` is reached.
111    ///
112    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
113    pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
114        super::block_on(self.send_deadline(msg, deadline))
115    }
116
117    /// Gets if the channel has no pending messages.
118    ///
119    /// Note that [`rendezvous`] channels are always empty.
120    pub fn is_empty(&self) -> bool {
121        self.0.is_empty()
122    }
123}
124
125/// The receiving end of a channel.
126///
127/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
128///
129/// # Work Stealing
130///
131/// Cloning the receiver **does not** turn this channel into a broadcast channel.
132/// Each message will only be received by a single receiver. You can use this to
133/// to implement work stealing.
134pub struct Receiver<T>(flume::Receiver<T>);
135impl<T> fmt::Debug for Receiver<T> {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
138    }
139}
140impl<T> Clone for Receiver<T> {
141    fn clone(&self) -> Self {
142        Receiver(self.0.clone())
143    }
144}
145impl<T> Receiver<T> {
146    /// Wait for an incoming value from the channel associated with this receiver.
147    ///
148    /// Returns an error if all senders have been dropped.
149    pub async fn recv(&self) -> Result<T, ChannelError> {
150        let r = self.0.recv_async().await?;
151        Ok(r)
152    }
153
154    /// Wait for an incoming value from the channel associated with this receiver.
155    ///
156    /// Returns an error if all senders have been dropped or the `deadline` is reached.
157    pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
158        match super::with_deadline(self.recv(), deadline).await {
159            Ok(r) => match r {
160                Ok(m) => Ok(m),
161                e => e,
162            },
163            Err(_) => Err(ChannelError::Timeout),
164        }
165    }
166
167    /// Wait for an incoming value from the channel associated with this receiver.
168    ///
169    /// Returns an error if all senders have been dropped.
170    pub fn recv_blocking(&self) -> Result<T, ChannelError> {
171        let r = self.0.recv()?;
172        Ok(r)
173    }
174
175    /// Block for an incoming value from the channel associated with this receiver.
176    ///
177    /// Returns an error if all senders have been dropped or the `deadline` is reached.
178    pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
179        self.recv_deadline_blocking_impl(deadline.into())
180    }
181    fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
182        // Improve timeout precision because this is used in the app main loop and timers are implemented using it
183
184        const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
185        const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
186
187        loop {
188            if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
189                if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
190                    // manual time is probably desynced from `Instant`, so we use `recv_timeout` that
191                    // is slightly less precise, but an app in manual mode probably does not care.
192                    match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
193                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
194                        interrupt => return interrupt.map_err(ChannelError::from),
195                    }
196                } else if d > WORST_SLEEP_ERR {
197                    // probably sleeps here.
198                    #[cfg(not(target_arch = "wasm32"))]
199                    match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
200                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
201                        interrupt => return interrupt.map_err(ChannelError::from),
202                    }
203
204                    #[cfg(target_arch = "wasm32")] // this actually panics because flume tries to use Instant::now
205                    match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
206                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
207                        interrupt => return interrupt.map_err(ChannelError::from),
208                    }
209                } else if d > WORST_SPIN_ERR {
210                    let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
211
212                    // try_recv spin
213                    while !spin_deadline.has_elapsed() {
214                        match self.0.try_recv() {
215                            Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
216                            interrupt => return interrupt.map_err(ChannelError::from),
217                        }
218                    }
219                    continue; // continue to timeout spin
220                } else {
221                    // last millis spin for better timeout precision
222                    while !deadline.has_elapsed() {
223                        std::thread::yield_now();
224                    }
225                    return Err(ChannelError::Timeout);
226                }
227            } else {
228                return Err(ChannelError::Timeout);
229            }
230        }
231    }
232
233    /// Returns the next incoming message in the channel or `None`.
234    pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
235        match self.0.try_recv() {
236            Ok(r) => Ok(Some(r)),
237            Err(e) => match e {
238                flume::TryRecvError::Empty => Ok(None),
239                flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
240            },
241        }
242    }
243
244    /// Create a blocking iterator that receives until a channel error.
245    pub fn iter(&self) -> impl Iterator<Item = T> {
246        self.0.iter()
247    }
248
249    /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
250    pub fn try_iter(&self) -> impl Iterator<Item = T> {
251        self.0.try_iter()
252    }
253
254    /// Gets if the channel has no pending messages.
255    ///
256    /// Note that [`rendezvous`] channels are always empty.
257    pub fn is_empty(&self) -> bool {
258        self.0.is_empty()
259    }
260}
261
262/// Create a channel with no maximum capacity.
263///
264/// Unbound channels always [`send`] messages immediately, never yielding on await.
265/// If the messages are not [received] they accumulate in the channel buffer.
266///
267/// # Examples
268///
269/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
270/// rapidly consumes all messages in the buffer and new messages as they are send.
271///
272/// ```no_run
273/// use zng_task::{self as task, channel};
274/// # use zng_unit::*;
275///
276/// let (sender, receiver) = channel::unbounded();
277///
278/// task::spawn(async move {
279///     for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
280///         task::deadline(300.ms()).await;
281///         if let Err(e) = sender.send(msg).await {
282///             eprintln!("no receiver connected, the message `{e}` was not send");
283///             break;
284///         }
285///     }
286/// });
287/// task::spawn(async move {
288///     task::deadline(5.secs()).await;
289///
290///     loop {
291///         match receiver.recv().await {
292///             Ok(msg) => println!("{msg}"),
293///             Err(_) => {
294///                 eprintln!("no message in channel and no sender connected");
295///                 break;
296///             }
297///         }
298///     }
299/// });
300/// ```
301///
302/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
303///
304/// [`send`]: Sender::send
305/// [received]: Receiver::recv
306/// [spawns]: crate::spawn
307pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
308    let (s, r) = flume::unbounded();
309    (Sender(s), Receiver(r))
310}
311
312/// Create a channel with a maximum capacity.
313///
314/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
315/// is [received] before sending another message.
316///
317/// # Examples
318///
319/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
320/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
321///
322/// ```no_run
323/// use zng_task::{self as task, channel};
324/// # use zng_unit::*;
325///
326/// let (sender, receiver) = channel::bounded(2);
327///
328/// task::spawn(async move {
329///     for msg in ["Hello!", "Data!"].into_iter().cycle() {
330///         task::deadline(300.ms()).await;
331///         if let Err(e) = sender.send(msg).await {
332///             eprintln!("no receiver connected, the message `{e}` was not send");
333///             break;
334///         }
335///     }
336/// });
337/// task::spawn(async move {
338///     task::deadline(5.secs()).await;
339///
340///     loop {
341///         match receiver.recv().await {
342///             Ok(msg) => println!("{msg}"),
343///             Err(_) => {
344///                 eprintln!("no message in channel and no sender connected");
345///                 break;
346///             }
347///         }
348///     }
349/// });
350/// ```
351///
352/// [`send`]: Sender::send
353/// [received]: Receiver::recv
354/// [spawns]: crate::spawn
355pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
356    let (s, r) = flume::bounded(capacity);
357    (Sender(s), Receiver(r))
358}
359
360/// Create a [`bounded`] channel with `0` capacity.
361///
362/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
363///
364/// # Examples
365///
366/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
367/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
368///
369/// ```no_run
370/// use zng_task::{self as task, channel};
371/// # use zng_unit::*;
372/// # use std::time::*;
373/// # use zng_time::*;
374///
375/// let (sender, receiver) = channel::rendezvous();
376///
377/// task::spawn(async move {
378///     loop {
379///         let t = INSTANT.now();
380///
381///         if let Err(e) = sender.send("the stuff").await {
382///             eprintln!(r#"failed to send "{}", no receiver connected"#, e);
383///             break;
384///         }
385///
386///         assert!(t.elapsed() >= 2.secs());
387///     }
388/// });
389/// task::spawn(async move {
390///     loop {
391///         task::deadline(2.secs()).await;
392///
393///         match receiver.recv().await {
394///             Ok(msg) => println!(r#"got "{msg}""#),
395///             Err(_) => {
396///                 eprintln!("no sender connected");
397///                 break;
398///             }
399///         }
400///     }
401/// });
402/// ```
403///
404/// [`send`]: Sender::send
405/// [received]: Receiver::recv
406/// [spawns]: crate::spawn
407pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
408    bounded::<T>(0)
409}
410
411/// Error during channel send or receive.
412#[derive(Debug, Clone)]
413pub enum ChannelError {
414    /// Channel has disconnected.
415    Disconnected {
416        /// Inner error that caused disconnection.
417        ///
418        /// Is `None` if disconnection was due to endpoint dropping or if the error happened at the other endpoint.
419        cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
420    },
421    /// Deadline elapsed before message could be send/received.
422    Timeout,
423}
424impl ChannelError {
425    /// Channel has disconnected due to endpoint drop.
426    pub fn disconnected() -> Self {
427        ChannelError::Disconnected { cause: None }
428    }
429
430    /// New from other `error`.
431    pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
432        ChannelError::Disconnected {
433            cause: Some(Arc::new(cause)),
434        }
435    }
436}
437impl fmt::Display for ChannelError {
438    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439        match self {
440            ChannelError::Disconnected { cause: source } => match source {
441                Some(e) => write!(f, "channel disconnected due to, {e}"),
442                None => write!(f, "channel disconnected"),
443            },
444            ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
445        }
446    }
447}
448impl std::error::Error for ChannelError {
449    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
450        if let Self::Disconnected { cause: Some(e) } = self {
451            Some(e)
452        } else {
453            None
454        }
455    }
456}
457impl PartialEq for ChannelError {
458    fn eq(&self, other: &Self) -> bool {
459        match (self, other) {
460            (Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
461                (None, None) => true,
462                (Some(a), Some(b)) => a.to_txt() == b.to_txt(),
463                _ => false,
464            },
465            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
466        }
467    }
468}
469impl Eq for ChannelError {}
470impl From<flume::RecvError> for ChannelError {
471    fn from(value: flume::RecvError) -> Self {
472        match value {
473            flume::RecvError::Disconnected => ChannelError::disconnected(),
474        }
475    }
476}
477impl From<flume::RecvTimeoutError> for ChannelError {
478    fn from(value: flume::RecvTimeoutError) -> Self {
479        match value {
480            flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
481            flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
482        }
483    }
484}
485impl<T> From<flume::SendError<T>> for ChannelError {
486    fn from(_: flume::SendError<T>) -> Self {
487        ChannelError::disconnected()
488    }
489}
490impl From<flume::TryRecvError> for ChannelError {
491    fn from(value: flume::TryRecvError) -> Self {
492        match value {
493            flume::TryRecvError::Empty => ChannelError::Timeout,
494            flume::TryRecvError::Disconnected => ChannelError::disconnected(),
495        }
496    }
497}