zng_task/
channel.rs

1//! Communication channels.
2//!
3//! Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
4//! Use [`ipc_unbounded`] to create channels that work across processes.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zng_task::{self as task, channel};
10//! # use zng_unit::*;
11//!
12//! let (sender, receiver) = channel::bounded(5);
13//!
14//! task::spawn(async move {
15//!     task::deadline(5.secs()).await;
16//!     if let Err(e) = sender.send("Data!").await {
17//!         eprintln!("no receiver connected, did not send message: '{e}'")
18//!     }
19//! });
20//! task::spawn(async move {
21//!     match receiver.recv().await {
22//!         Ok(msg) => println!("{msg}"),
23//!         Err(_) => eprintln!("no message in channel and no sender connected"),
24//!     }
25//! });
26//! ```
27//!
28//! [`flume`]: https://docs.rs/flume
29//! [`ipc-channel`]: https://docs.rs/ipc-channel
30
31use std::{fmt, sync::Arc, time::Duration};
32
33use zng_time::{Deadline, INSTANT};
34
35mod ipc;
36pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
37
38mod ipc_bytes;
39pub use ipc_bytes::{IpcBytes, IpcBytesCast, IpcBytesMut, IpcBytesMutCast, IpcBytesWriter, IpcBytesWriterBlocking, WeakIpcBytes};
40
41#[cfg(ipc)]
42pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
43use zng_txt::ToTxt;
44
45/// The transmitting end of a channel.
46///
47/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
48pub struct Sender<T>(flume::Sender<T>);
49impl<T> fmt::Debug for Sender<T> {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
52    }
53}
54impl<T> Clone for Sender<T> {
55    fn clone(&self) -> Self {
56        Sender(self.0.clone())
57    }
58}
59impl<T> From<flume::Sender<T>> for Sender<T> {
60    fn from(s: flume::Sender<T>) -> Self {
61        Sender(s)
62    }
63}
64impl<T> From<Sender<T>> for flume::Sender<T> {
65    fn from(s: Sender<T>) -> Self {
66        s.0
67    }
68}
69impl<T> Sender<T> {
70    /// Send a value into the channel.
71    ///
72    /// Waits until there is space in the channel buffer.
73    ///
74    /// Returns an error if all receivers have been dropped.
75    pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
76        self.0.send_async(msg).await?;
77        Ok(())
78    }
79
80    /// Send a value into the channel.
81    ///
82    /// Waits until there is space in the channel buffer or the `deadline` is reached.
83    ///
84    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
85    pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
86        match super::with_deadline(self.send(msg), deadline).await {
87            Ok(r) => match r {
88                Ok(_) => Ok(()),
89                Err(e) => Err(e),
90            },
91            Err(_) => Err(ChannelError::Timeout),
92        }
93    }
94
95    /// Send a value into the channel.
96    ///
97    /// Blocks until there is space in the channel buffer.
98    ///
99    /// Returns an error if all receivers have been dropped.
100    pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
101        self.0.send(msg)?;
102        Ok(())
103    }
104
105    /// Send a value into the channel.
106    ///
107    /// Blocks until there is space in the channel buffer or the `deadline` is reached.
108    ///
109    /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
110    pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
111        super::block_on(self.send_deadline(msg, deadline))
112    }
113
114    /// Gets if the channel has no pending messages.
115    ///
116    /// Note that [`rendezvous`] channels are always empty.
117    pub fn is_empty(&self) -> bool {
118        self.0.is_empty()
119    }
120}
121
122/// The receiving end of a channel.
123///
124/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
125///
126/// # Work Stealing
127///
128/// Cloning the receiver **does not** turn this channel into a broadcast channel.
129/// Each message will only be received by a single receiver. You can use this to
130/// to implement work stealing.
131pub struct Receiver<T>(flume::Receiver<T>);
132impl<T> fmt::Debug for Receiver<T> {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
135    }
136}
137impl<T> Clone for Receiver<T> {
138    fn clone(&self) -> Self {
139        Receiver(self.0.clone())
140    }
141}
142impl<T> Receiver<T> {
143    /// Wait for an incoming value from the channel associated with this receiver.
144    ///
145    /// Returns an error if all senders have been dropped.
146    pub async fn recv(&self) -> Result<T, ChannelError> {
147        let r = self.0.recv_async().await?;
148        Ok(r)
149    }
150
151    /// Wait for an incoming value from the channel associated with this receiver.
152    ///
153    /// Returns an error if all senders have been dropped or the `deadline` is reached.
154    pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
155        match super::with_deadline(self.recv(), deadline).await {
156            Ok(r) => match r {
157                Ok(m) => Ok(m),
158                e => e,
159            },
160            Err(_) => Err(ChannelError::Timeout),
161        }
162    }
163
164    /// Wait for an incoming value from the channel associated with this receiver.
165    ///
166    /// Returns an error if all senders have been dropped.
167    pub fn recv_blocking(&self) -> Result<T, ChannelError> {
168        let r = self.0.recv()?;
169        Ok(r)
170    }
171
172    /// Block for an incoming value from the channel associated with this receiver.
173    ///
174    /// Returns an error if all senders have been dropped or the `deadline` is reached.
175    pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
176        self.recv_deadline_blocking_impl(deadline.into())
177    }
178    fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
179        // Improve timeout precision because this is used in the app main loop and timers are implemented using it
180
181        const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
182        const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
183
184        loop {
185            if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
186                if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
187                    // manual time is probably desynced from `Instant`, so we use `recv_timeout` that
188                    // is slightly less precise, but an app in manual mode probably does not care.
189                    match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
190                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
191                        interrupt => return interrupt.map_err(ChannelError::from),
192                    }
193                } else if d > WORST_SLEEP_ERR {
194                    // probably sleeps here.
195                    #[cfg(not(target_arch = "wasm32"))]
196                    match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
197                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
198                        interrupt => return interrupt.map_err(ChannelError::from),
199                    }
200
201                    #[cfg(target_arch = "wasm32")] // this actually panics because flume tries to use Instant::now
202                    match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
203                        Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
204                        interrupt => return interrupt.map_err(ChannelError::from),
205                    }
206                } else if d > WORST_SPIN_ERR {
207                    let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
208
209                    // try_recv spin
210                    while !spin_deadline.has_elapsed() {
211                        match self.0.try_recv() {
212                            Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
213                            interrupt => return interrupt.map_err(ChannelError::from),
214                        }
215                    }
216                    continue; // continue to timeout spin
217                } else {
218                    // last millis spin for better timeout precision
219                    while !deadline.has_elapsed() {
220                        std::thread::yield_now();
221                    }
222                    return Err(ChannelError::Timeout);
223                }
224            } else {
225                return Err(ChannelError::Timeout);
226            }
227        }
228    }
229
230    /// Returns the next incoming message in the channel or `None`.
231    pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
232        match self.0.try_recv() {
233            Ok(r) => Ok(Some(r)),
234            Err(e) => match e {
235                flume::TryRecvError::Empty => Ok(None),
236                flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
237            },
238        }
239    }
240
241    /// Create a blocking iterator that receives until a channel error.
242    pub fn iter(&self) -> impl Iterator<Item = T> {
243        self.0.iter()
244    }
245
246    /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
247    pub fn try_iter(&self) -> impl Iterator<Item = T> {
248        self.0.try_iter()
249    }
250
251    /// Gets if the channel has no pending messages.
252    ///
253    /// Note that [`rendezvous`] channels are always empty.
254    pub fn is_empty(&self) -> bool {
255        self.0.is_empty()
256    }
257}
258
259/// Create a channel with no maximum capacity.
260///
261/// Unbound channels always [`send`] messages immediately, never yielding on await.
262/// If the messages are not [received] they accumulate in the channel buffer.
263///
264/// # Examples
265///
266/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
267/// rapidly consumes all messages in the buffer and new messages as they are send.
268///
269/// ```no_run
270/// use zng_task::{self as task, channel};
271/// # use zng_unit::*;
272///
273/// let (sender, receiver) = channel::unbounded();
274///
275/// task::spawn(async move {
276///     for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
277///         task::deadline(300.ms()).await;
278///         if let Err(e) = sender.send(msg).await {
279///             eprintln!("no receiver connected, the message `{e}` was not send");
280///             break;
281///         }
282///     }
283/// });
284/// task::spawn(async move {
285///     task::deadline(5.secs()).await;
286///
287///     loop {
288///         match receiver.recv().await {
289///             Ok(msg) => println!("{msg}"),
290///             Err(_) => {
291///                 eprintln!("no message in channel and no sender connected");
292///                 break;
293///             }
294///         }
295///     }
296/// });
297/// ```
298///
299/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
300///
301/// [`send`]: Sender::send
302/// [received]: Receiver::recv
303/// [spawns]: crate::spawn
304pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
305    let (s, r) = flume::unbounded();
306    (Sender(s), Receiver(r))
307}
308
309/// Create a channel with a maximum capacity.
310///
311/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
312/// is [received] before sending another message.
313///
314/// # Examples
315///
316/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
317/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
318///
319/// ```no_run
320/// use zng_task::{self as task, channel};
321/// # use zng_unit::*;
322///
323/// let (sender, receiver) = channel::bounded(2);
324///
325/// task::spawn(async move {
326///     for msg in ["Hello!", "Data!"].into_iter().cycle() {
327///         task::deadline(300.ms()).await;
328///         if let Err(e) = sender.send(msg).await {
329///             eprintln!("no receiver connected, the message `{e}` was not send");
330///             break;
331///         }
332///     }
333/// });
334/// task::spawn(async move {
335///     task::deadline(5.secs()).await;
336///
337///     loop {
338///         match receiver.recv().await {
339///             Ok(msg) => println!("{msg}"),
340///             Err(_) => {
341///                 eprintln!("no message in channel and no sender connected");
342///                 break;
343///             }
344///         }
345///     }
346/// });
347/// ```
348///
349/// [`send`]: Sender::send
350/// [received]: Receiver::recv
351/// [spawns]: crate::spawn
352pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
353    let (s, r) = flume::bounded(capacity);
354    (Sender(s), Receiver(r))
355}
356
357/// Create a [`bounded`] channel with `0` capacity.
358///
359/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
360///
361/// # Examples
362///
363/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
364/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
365///
366/// ```no_run
367/// use zng_task::{self as task, channel};
368/// # use zng_unit::*;
369/// # use std::time::*;
370/// # use zng_time::*;
371///
372/// let (sender, receiver) = channel::rendezvous();
373///
374/// task::spawn(async move {
375///     loop {
376///         let t = INSTANT.now();
377///
378///         if let Err(e) = sender.send("the stuff").await {
379///             eprintln!(r#"failed to send "{}", no receiver connected"#, e);
380///             break;
381///         }
382///
383///         assert!(t.elapsed() >= 2.secs());
384///     }
385/// });
386/// task::spawn(async move {
387///     loop {
388///         task::deadline(2.secs()).await;
389///
390///         match receiver.recv().await {
391///             Ok(msg) => println!(r#"got "{msg}""#),
392///             Err(_) => {
393///                 eprintln!("no sender connected");
394///                 break;
395///             }
396///         }
397///     }
398/// });
399/// ```
400///
401/// [`send`]: Sender::send
402/// [received]: Receiver::recv
403/// [spawns]: crate::spawn
404pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
405    bounded::<T>(0)
406}
407
408/// Error during channel send or receive.
409#[derive(Debug, Clone)]
410pub enum ChannelError {
411    /// Channel has disconnected.
412    Disconnected {
413        /// Inner error that caused disconnection.
414        ///
415        /// Is `None` if disconnection was due to endpoint dropping or if the error happened at the other endpoint.
416        cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
417    },
418    /// Deadline elapsed before message could be send/received.
419    Timeout,
420}
421impl ChannelError {
422    /// Channel has disconnected due to endpoint drop.
423    pub fn disconnected() -> Self {
424        ChannelError::Disconnected { cause: None }
425    }
426
427    /// New from other `error`.
428    pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
429        ChannelError::Disconnected {
430            cause: Some(Arc::new(cause)),
431        }
432    }
433}
434impl fmt::Display for ChannelError {
435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436        match self {
437            ChannelError::Disconnected { cause: source } => match source {
438                Some(e) => write!(f, "channel disconnected due to, {e}"),
439                None => write!(f, "channel disconnected"),
440            },
441            ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
442        }
443    }
444}
445impl std::error::Error for ChannelError {
446    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
447        if let Self::Disconnected { cause: Some(e) } = self {
448            Some(e)
449        } else {
450            None
451        }
452    }
453}
454impl PartialEq for ChannelError {
455    fn eq(&self, other: &Self) -> bool {
456        match (self, other) {
457            (Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
458                (None, None) => true,
459                (Some(a), Some(b)) => a.to_txt() == b.to_txt(),
460                _ => false,
461            },
462            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
463        }
464    }
465}
466impl Eq for ChannelError {}
467impl From<flume::RecvError> for ChannelError {
468    fn from(value: flume::RecvError) -> Self {
469        match value {
470            flume::RecvError::Disconnected => ChannelError::disconnected(),
471        }
472    }
473}
474impl From<flume::RecvTimeoutError> for ChannelError {
475    fn from(value: flume::RecvTimeoutError) -> Self {
476        match value {
477            flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
478            flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
479        }
480    }
481}
482impl<T> From<flume::SendError<T>> for ChannelError {
483    fn from(_: flume::SendError<T>) -> Self {
484        ChannelError::disconnected()
485    }
486}
487impl From<flume::TryRecvError> for ChannelError {
488    fn from(value: flume::TryRecvError) -> Self {
489        match value {
490            flume::TryRecvError::Empty => ChannelError::Timeout,
491            flume::TryRecvError::Disconnected => ChannelError::disconnected(),
492        }
493    }
494}