zng_task/channel.rs
1//! Communication channels.
2//!
3//! Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
4//! Use [`ipc_unbounded`] to create channels that work across processes.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zng_task::{self as task, channel};
10//! # use zng_unit::*;
11//!
12//! let (sender, receiver) = channel::bounded(5);
13//!
14//! task::spawn(async move {
15//! task::deadline(5.secs()).await;
16//! if let Err(e) = sender.send("Data!").await {
17//! eprintln!("no receiver connected, did not send message: '{e}'")
18//! }
19//! });
20//! task::spawn(async move {
21//! match receiver.recv().await {
22//! Ok(msg) => println!("{msg}"),
23//! Err(_) => eprintln!("no message in channel and no sender connected"),
24//! }
25//! });
26//! ```
27//!
28//! [`flume`]: https://docs.rs/flume
29//! [`ipc-channel`]: https://docs.rs/ipc-channel
30
31use std::{fmt, sync::Arc, time::Duration};
32
33use zng_time::{Deadline, INSTANT};
34
35mod ipc;
36pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
37
38mod ipc_bytes;
39pub use ipc_bytes::{IpcBytes, IpcBytesCast, IpcBytesMut, IpcBytesMutCast, IpcBytesWriter, IpcBytesWriterBlocking, WeakIpcBytes};
40
41#[cfg(ipc)]
42pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
43use zng_txt::ToTxt;
44
45/// The transmitting end of a channel.
46///
47/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
48pub struct Sender<T>(flume::Sender<T>);
49impl<T> fmt::Debug for Sender<T> {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
52 }
53}
54impl<T> Clone for Sender<T> {
55 fn clone(&self) -> Self {
56 Sender(self.0.clone())
57 }
58}
59impl<T> From<flume::Sender<T>> for Sender<T> {
60 fn from(s: flume::Sender<T>) -> Self {
61 Sender(s)
62 }
63}
64impl<T> From<Sender<T>> for flume::Sender<T> {
65 fn from(s: Sender<T>) -> Self {
66 s.0
67 }
68}
69impl<T> Sender<T> {
70 /// Send a value into the channel.
71 ///
72 /// Waits until there is space in the channel buffer.
73 ///
74 /// Returns an error if all receivers have been dropped.
75 pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
76 self.0.send_async(msg).await?;
77 Ok(())
78 }
79
80 /// Send a value into the channel.
81 ///
82 /// Waits until there is space in the channel buffer or the `deadline` is reached.
83 ///
84 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
85 pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
86 match super::with_deadline(self.send(msg), deadline).await {
87 Ok(r) => match r {
88 Ok(_) => Ok(()),
89 Err(e) => Err(e),
90 },
91 Err(_) => Err(ChannelError::Timeout),
92 }
93 }
94
95 /// Send a value into the channel.
96 ///
97 /// Blocks until there is space in the channel buffer.
98 ///
99 /// Returns an error if all receivers have been dropped.
100 pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
101 self.0.send(msg)?;
102 Ok(())
103 }
104
105 /// Send a value into the channel.
106 ///
107 /// Blocks until there is space in the channel buffer or the `deadline` is reached.
108 ///
109 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
110 pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
111 super::block_on(self.send_deadline(msg, deadline))
112 }
113
114 /// Gets if the channel has no pending messages.
115 ///
116 /// Note that [`rendezvous`] channels are always empty.
117 pub fn is_empty(&self) -> bool {
118 self.0.is_empty()
119 }
120}
121
122/// The receiving end of a channel.
123///
124/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
125///
126/// # Work Stealing
127///
128/// Cloning the receiver **does not** turn this channel into a broadcast channel.
129/// Each message will only be received by a single receiver. You can use this to
130/// to implement work stealing.
131pub struct Receiver<T>(flume::Receiver<T>);
132impl<T> fmt::Debug for Receiver<T> {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
135 }
136}
137impl<T> Clone for Receiver<T> {
138 fn clone(&self) -> Self {
139 Receiver(self.0.clone())
140 }
141}
142impl<T> Receiver<T> {
143 /// Wait for an incoming value from the channel associated with this receiver.
144 ///
145 /// Returns an error if all senders have been dropped.
146 pub async fn recv(&self) -> Result<T, ChannelError> {
147 let r = self.0.recv_async().await?;
148 Ok(r)
149 }
150
151 /// Wait for an incoming value from the channel associated with this receiver.
152 ///
153 /// Returns an error if all senders have been dropped or the `deadline` is reached.
154 pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
155 match super::with_deadline(self.recv(), deadline).await {
156 Ok(r) => match r {
157 Ok(m) => Ok(m),
158 e => e,
159 },
160 Err(_) => Err(ChannelError::Timeout),
161 }
162 }
163
164 /// Wait for an incoming value from the channel associated with this receiver.
165 ///
166 /// Returns an error if all senders have been dropped.
167 pub fn recv_blocking(&self) -> Result<T, ChannelError> {
168 let r = self.0.recv()?;
169 Ok(r)
170 }
171
172 /// Block for an incoming value from the channel associated with this receiver.
173 ///
174 /// Returns an error if all senders have been dropped or the `deadline` is reached.
175 pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
176 self.recv_deadline_blocking_impl(deadline.into())
177 }
178 fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
179 // Improve timeout precision because this is used in the app main loop and timers are implemented using it
180
181 const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
182 const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
183
184 loop {
185 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
186 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
187 // manual time is probably desynced from `Instant`, so we use `recv_timeout` that
188 // is slightly less precise, but an app in manual mode probably does not care.
189 match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
190 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
191 interrupt => return interrupt.map_err(ChannelError::from),
192 }
193 } else if d > WORST_SLEEP_ERR {
194 // probably sleeps here.
195 #[cfg(not(target_arch = "wasm32"))]
196 match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
197 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
198 interrupt => return interrupt.map_err(ChannelError::from),
199 }
200
201 #[cfg(target_arch = "wasm32")] // this actually panics because flume tries to use Instant::now
202 match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
203 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
204 interrupt => return interrupt.map_err(ChannelError::from),
205 }
206 } else if d > WORST_SPIN_ERR {
207 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
208
209 // try_recv spin
210 while !spin_deadline.has_elapsed() {
211 match self.0.try_recv() {
212 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
213 interrupt => return interrupt.map_err(ChannelError::from),
214 }
215 }
216 continue; // continue to timeout spin
217 } else {
218 // last millis spin for better timeout precision
219 while !deadline.has_elapsed() {
220 std::thread::yield_now();
221 }
222 return Err(ChannelError::Timeout);
223 }
224 } else {
225 return Err(ChannelError::Timeout);
226 }
227 }
228 }
229
230 /// Returns the next incoming message in the channel or `None`.
231 pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
232 match self.0.try_recv() {
233 Ok(r) => Ok(Some(r)),
234 Err(e) => match e {
235 flume::TryRecvError::Empty => Ok(None),
236 flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
237 },
238 }
239 }
240
241 /// Create a blocking iterator that receives until a channel error.
242 pub fn iter(&self) -> impl Iterator<Item = T> {
243 self.0.iter()
244 }
245
246 /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
247 pub fn try_iter(&self) -> impl Iterator<Item = T> {
248 self.0.try_iter()
249 }
250
251 /// Gets if the channel has no pending messages.
252 ///
253 /// Note that [`rendezvous`] channels are always empty.
254 pub fn is_empty(&self) -> bool {
255 self.0.is_empty()
256 }
257}
258
259/// Create a channel with no maximum capacity.
260///
261/// Unbound channels always [`send`] messages immediately, never yielding on await.
262/// If the messages are not [received] they accumulate in the channel buffer.
263///
264/// # Examples
265///
266/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
267/// rapidly consumes all messages in the buffer and new messages as they are send.
268///
269/// ```no_run
270/// use zng_task::{self as task, channel};
271/// # use zng_unit::*;
272///
273/// let (sender, receiver) = channel::unbounded();
274///
275/// task::spawn(async move {
276/// for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
277/// task::deadline(300.ms()).await;
278/// if let Err(e) = sender.send(msg).await {
279/// eprintln!("no receiver connected, the message `{e}` was not send");
280/// break;
281/// }
282/// }
283/// });
284/// task::spawn(async move {
285/// task::deadline(5.secs()).await;
286///
287/// loop {
288/// match receiver.recv().await {
289/// Ok(msg) => println!("{msg}"),
290/// Err(_) => {
291/// eprintln!("no message in channel and no sender connected");
292/// break;
293/// }
294/// }
295/// }
296/// });
297/// ```
298///
299/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
300///
301/// [`send`]: Sender::send
302/// [received]: Receiver::recv
303/// [spawns]: crate::spawn
304pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
305 let (s, r) = flume::unbounded();
306 (Sender(s), Receiver(r))
307}
308
309/// Create a channel with a maximum capacity.
310///
311/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
312/// is [received] before sending another message.
313///
314/// # Examples
315///
316/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
317/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
318///
319/// ```no_run
320/// use zng_task::{self as task, channel};
321/// # use zng_unit::*;
322///
323/// let (sender, receiver) = channel::bounded(2);
324///
325/// task::spawn(async move {
326/// for msg in ["Hello!", "Data!"].into_iter().cycle() {
327/// task::deadline(300.ms()).await;
328/// if let Err(e) = sender.send(msg).await {
329/// eprintln!("no receiver connected, the message `{e}` was not send");
330/// break;
331/// }
332/// }
333/// });
334/// task::spawn(async move {
335/// task::deadline(5.secs()).await;
336///
337/// loop {
338/// match receiver.recv().await {
339/// Ok(msg) => println!("{msg}"),
340/// Err(_) => {
341/// eprintln!("no message in channel and no sender connected");
342/// break;
343/// }
344/// }
345/// }
346/// });
347/// ```
348///
349/// [`send`]: Sender::send
350/// [received]: Receiver::recv
351/// [spawns]: crate::spawn
352pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
353 let (s, r) = flume::bounded(capacity);
354 (Sender(s), Receiver(r))
355}
356
357/// Create a [`bounded`] channel with `0` capacity.
358///
359/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
360///
361/// # Examples
362///
363/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
364/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
365///
366/// ```no_run
367/// use zng_task::{self as task, channel};
368/// # use zng_unit::*;
369/// # use std::time::*;
370/// # use zng_time::*;
371///
372/// let (sender, receiver) = channel::rendezvous();
373///
374/// task::spawn(async move {
375/// loop {
376/// let t = INSTANT.now();
377///
378/// if let Err(e) = sender.send("the stuff").await {
379/// eprintln!(r#"failed to send "{}", no receiver connected"#, e);
380/// break;
381/// }
382///
383/// assert!(t.elapsed() >= 2.secs());
384/// }
385/// });
386/// task::spawn(async move {
387/// loop {
388/// task::deadline(2.secs()).await;
389///
390/// match receiver.recv().await {
391/// Ok(msg) => println!(r#"got "{msg}""#),
392/// Err(_) => {
393/// eprintln!("no sender connected");
394/// break;
395/// }
396/// }
397/// }
398/// });
399/// ```
400///
401/// [`send`]: Sender::send
402/// [received]: Receiver::recv
403/// [spawns]: crate::spawn
404pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
405 bounded::<T>(0)
406}
407
408/// Error during channel send or receive.
409#[derive(Debug, Clone)]
410pub enum ChannelError {
411 /// Channel has disconnected.
412 Disconnected {
413 /// Inner error that caused disconnection.
414 ///
415 /// Is `None` if disconnection was due to endpoint dropping or if the error happened at the other endpoint.
416 cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
417 },
418 /// Deadline elapsed before message could be send/received.
419 Timeout,
420}
421impl ChannelError {
422 /// Channel has disconnected due to endpoint drop.
423 pub fn disconnected() -> Self {
424 ChannelError::Disconnected { cause: None }
425 }
426
427 /// New from other `error`.
428 pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
429 ChannelError::Disconnected {
430 cause: Some(Arc::new(cause)),
431 }
432 }
433}
434impl fmt::Display for ChannelError {
435 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436 match self {
437 ChannelError::Disconnected { cause: source } => match source {
438 Some(e) => write!(f, "channel disconnected due to, {e}"),
439 None => write!(f, "channel disconnected"),
440 },
441 ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
442 }
443 }
444}
445impl std::error::Error for ChannelError {
446 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
447 if let Self::Disconnected { cause: Some(e) } = self {
448 Some(e)
449 } else {
450 None
451 }
452 }
453}
454impl PartialEq for ChannelError {
455 fn eq(&self, other: &Self) -> bool {
456 match (self, other) {
457 (Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
458 (None, None) => true,
459 (Some(a), Some(b)) => a.to_txt() == b.to_txt(),
460 _ => false,
461 },
462 _ => core::mem::discriminant(self) == core::mem::discriminant(other),
463 }
464 }
465}
466impl Eq for ChannelError {}
467impl From<flume::RecvError> for ChannelError {
468 fn from(value: flume::RecvError) -> Self {
469 match value {
470 flume::RecvError::Disconnected => ChannelError::disconnected(),
471 }
472 }
473}
474impl From<flume::RecvTimeoutError> for ChannelError {
475 fn from(value: flume::RecvTimeoutError) -> Self {
476 match value {
477 flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
478 flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
479 }
480 }
481}
482impl<T> From<flume::SendError<T>> for ChannelError {
483 fn from(_: flume::SendError<T>) -> Self {
484 ChannelError::disconnected()
485 }
486}
487impl From<flume::TryRecvError> for ChannelError {
488 fn from(value: flume::TryRecvError) -> Self {
489 match value {
490 flume::TryRecvError::Empty => ChannelError::Timeout,
491 flume::TryRecvError::Disconnected => ChannelError::disconnected(),
492 }
493 }
494}