zng_task/channel.rs
1//! Communication channels.
2//!
3//! Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
4//! Use [`ipc_unbounded`] to create channels that work across processes.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use zng_task::{self as task, channel};
10//! # use zng_unit::*;
11//!
12//! let (sender, receiver) = channel::bounded(5);
13//!
14//! task::spawn(async move {
15//! task::deadline(5.secs()).await;
16//! if let Err(e) = sender.send("Data!").await {
17//! eprintln!("no receiver connected, did not send message: '{e}'")
18//! }
19//! });
20//! task::spawn(async move {
21//! match receiver.recv().await {
22//! Ok(msg) => println!("{msg}"),
23//! Err(_) => eprintln!("no message in channel and no sender connected"),
24//! }
25//! });
26//! ```
27//!
28//! [`flume`]: https://docs.rs/flume
29//! [`ipc-channel`]: https://docs.rs/ipc-channel
30
31use std::{fmt, sync::Arc, time::Duration};
32
33use zng_time::{Deadline, INSTANT};
34
35mod ipc;
36pub use ipc::{IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver, NamedIpcSender, ipc_unbounded};
37
38mod ipc_bytes;
39pub use ipc_bytes::{
40 IpcBytes, IpcBytesCast, IpcBytesCastIntoIter, IpcBytesIntoIter, IpcBytesMut, IpcBytesMutCast, IpcBytesWriter, IpcBytesWriterBlocking,
41 WeakIpcBytes,
42};
43
44#[cfg(ipc)]
45pub use ipc_bytes::{is_ipc_serialization, with_ipc_serialization};
46use zng_txt::ToTxt;
47
48/// The transmitting end of a channel.
49///
50/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
51pub struct Sender<T>(flume::Sender<T>);
52impl<T> fmt::Debug for Sender<T> {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 write!(f, "Sender<{}>", pretty_type_name::pretty_type_name::<T>())
55 }
56}
57impl<T> Clone for Sender<T> {
58 fn clone(&self) -> Self {
59 Sender(self.0.clone())
60 }
61}
62impl<T> From<flume::Sender<T>> for Sender<T> {
63 fn from(s: flume::Sender<T>) -> Self {
64 Sender(s)
65 }
66}
67impl<T> From<Sender<T>> for flume::Sender<T> {
68 fn from(s: Sender<T>) -> Self {
69 s.0
70 }
71}
72impl<T> Sender<T> {
73 /// Send a value into the channel.
74 ///
75 /// Waits until there is space in the channel buffer.
76 ///
77 /// Returns an error if all receivers have been dropped.
78 pub async fn send(&self, msg: T) -> Result<(), ChannelError> {
79 self.0.send_async(msg).await?;
80 Ok(())
81 }
82
83 /// Send a value into the channel.
84 ///
85 /// Waits until there is space in the channel buffer or the `deadline` is reached.
86 ///
87 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
88 pub async fn send_deadline(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
89 match super::with_deadline(self.send(msg), deadline).await {
90 Ok(r) => match r {
91 Ok(_) => Ok(()),
92 Err(e) => Err(e),
93 },
94 Err(_) => Err(ChannelError::Timeout),
95 }
96 }
97
98 /// Send a value into the channel.
99 ///
100 /// Blocks until there is space in the channel buffer.
101 ///
102 /// Returns an error if all receivers have been dropped.
103 pub fn send_blocking(&self, msg: T) -> Result<(), ChannelError> {
104 self.0.send(msg)?;
105 Ok(())
106 }
107
108 /// Send a value into the channel.
109 ///
110 /// Blocks until there is space in the channel buffer or the `deadline` is reached.
111 ///
112 /// Returns an error if all receivers have been dropped or the `deadline` is reached. The `msg` is lost in case of timeout.
113 pub fn send_deadline_blocking(&self, msg: T, deadline: impl Into<Deadline>) -> Result<(), ChannelError> {
114 super::block_on(self.send_deadline(msg, deadline))
115 }
116
117 /// Gets if the channel has no pending messages.
118 ///
119 /// Note that [`rendezvous`] channels are always empty.
120 pub fn is_empty(&self) -> bool {
121 self.0.is_empty()
122 }
123}
124
125/// The receiving end of a channel.
126///
127/// Use [`unbounded`], [`bounded`] or [`rendezvous`] to create a channel.
128///
129/// # Work Stealing
130///
131/// Cloning the receiver **does not** turn this channel into a broadcast channel.
132/// Each message will only be received by a single receiver. You can use this to
133/// to implement work stealing.
134pub struct Receiver<T>(flume::Receiver<T>);
135impl<T> fmt::Debug for Receiver<T> {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 write!(f, "Receiver<{}>", pretty_type_name::pretty_type_name::<T>())
138 }
139}
140impl<T> Clone for Receiver<T> {
141 fn clone(&self) -> Self {
142 Receiver(self.0.clone())
143 }
144}
145impl<T> Receiver<T> {
146 /// Wait for an incoming value from the channel associated with this receiver.
147 ///
148 /// Returns an error if all senders have been dropped.
149 pub async fn recv(&self) -> Result<T, ChannelError> {
150 let r = self.0.recv_async().await?;
151 Ok(r)
152 }
153
154 /// Wait for an incoming value from the channel associated with this receiver.
155 ///
156 /// Returns an error if all senders have been dropped or the `deadline` is reached.
157 pub async fn recv_deadline(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
158 match super::with_deadline(self.recv(), deadline).await {
159 Ok(r) => match r {
160 Ok(m) => Ok(m),
161 e => e,
162 },
163 Err(_) => Err(ChannelError::Timeout),
164 }
165 }
166
167 /// Wait for an incoming value from the channel associated with this receiver.
168 ///
169 /// Returns an error if all senders have been dropped.
170 pub fn recv_blocking(&self) -> Result<T, ChannelError> {
171 let r = self.0.recv()?;
172 Ok(r)
173 }
174
175 /// Block for an incoming value from the channel associated with this receiver.
176 ///
177 /// Returns an error if all senders have been dropped or the `deadline` is reached.
178 pub fn recv_deadline_blocking(&self, deadline: impl Into<Deadline>) -> Result<T, ChannelError> {
179 self.recv_deadline_blocking_impl(deadline.into())
180 }
181 fn recv_deadline_blocking_impl(&self, deadline: Deadline) -> Result<T, ChannelError> {
182 // Improve timeout precision because this is used in the app main loop and timers are implemented using it
183
184 const WORST_SLEEP_ERR: Duration = Duration::from_millis(if cfg!(windows) { 20 } else { 10 });
185 const WORST_SPIN_ERR: Duration = Duration::from_millis(if cfg!(windows) { 2 } else { 1 });
186
187 loop {
188 if let Some(d) = deadline.0.checked_duration_since(INSTANT.now()) {
189 if matches!(INSTANT.mode(), zng_time::InstantMode::Manual) {
190 // manual time is probably desynced from `Instant`, so we use `recv_timeout` that
191 // is slightly less precise, but an app in manual mode probably does not care.
192 match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
193 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
194 interrupt => return interrupt.map_err(ChannelError::from),
195 }
196 } else if d > WORST_SLEEP_ERR {
197 // probably sleeps here.
198 #[cfg(not(target_arch = "wasm32"))]
199 match self.0.recv_deadline(deadline.0.checked_sub(WORST_SLEEP_ERR).unwrap().into()) {
200 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
201 interrupt => return interrupt.map_err(ChannelError::from),
202 }
203
204 #[cfg(target_arch = "wasm32")] // this actually panics because flume tries to use Instant::now
205 match self.0.recv_timeout(d.checked_sub(WORST_SLEEP_ERR).unwrap_or_default()) {
206 Err(flume::RecvTimeoutError::Timeout) => continue, // continue to try_recv spin
207 interrupt => return interrupt.map_err(ChannelError::from),
208 }
209 } else if d > WORST_SPIN_ERR {
210 let spin_deadline = Deadline(deadline.0.checked_sub(WORST_SPIN_ERR).unwrap());
211
212 // try_recv spin
213 while !spin_deadline.has_elapsed() {
214 match self.0.try_recv() {
215 Err(flume::TryRecvError::Empty) => std::thread::yield_now(),
216 interrupt => return interrupt.map_err(ChannelError::from),
217 }
218 }
219 continue; // continue to timeout spin
220 } else {
221 // last millis spin for better timeout precision
222 while !deadline.has_elapsed() {
223 std::thread::yield_now();
224 }
225 return Err(ChannelError::Timeout);
226 }
227 } else {
228 return Err(ChannelError::Timeout);
229 }
230 }
231 }
232
233 /// Returns the next incoming message in the channel or `None`.
234 pub fn try_recv(&self) -> Result<Option<T>, ChannelError> {
235 match self.0.try_recv() {
236 Ok(r) => Ok(Some(r)),
237 Err(e) => match e {
238 flume::TryRecvError::Empty => Ok(None),
239 flume::TryRecvError::Disconnected => Err(ChannelError::disconnected()),
240 },
241 }
242 }
243
244 /// Create a blocking iterator that receives until a channel error.
245 pub fn iter(&self) -> impl Iterator<Item = T> {
246 self.0.iter()
247 }
248
249 /// Iterate over all the pending incoming messages in the channel, until the channel is empty or error.
250 pub fn try_iter(&self) -> impl Iterator<Item = T> {
251 self.0.try_iter()
252 }
253
254 /// Gets if the channel has no pending messages.
255 ///
256 /// Note that [`rendezvous`] channels are always empty.
257 pub fn is_empty(&self) -> bool {
258 self.0.is_empty()
259 }
260}
261
262/// Create a channel with no maximum capacity.
263///
264/// Unbound channels always [`send`] messages immediately, never yielding on await.
265/// If the messages are not [received] they accumulate in the channel buffer.
266///
267/// # Examples
268///
269/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
270/// rapidly consumes all messages in the buffer and new messages as they are send.
271///
272/// ```no_run
273/// use zng_task::{self as task, channel};
274/// # use zng_unit::*;
275///
276/// let (sender, receiver) = channel::unbounded();
277///
278/// task::spawn(async move {
279/// for msg in ["Hello!", "Are you still there?"].into_iter().cycle() {
280/// task::deadline(300.ms()).await;
281/// if let Err(e) = sender.send(msg).await {
282/// eprintln!("no receiver connected, the message `{e}` was not send");
283/// break;
284/// }
285/// }
286/// });
287/// task::spawn(async move {
288/// task::deadline(5.secs()).await;
289///
290/// loop {
291/// match receiver.recv().await {
292/// Ok(msg) => println!("{msg}"),
293/// Err(_) => {
294/// eprintln!("no message in channel and no sender connected");
295/// break;
296/// }
297/// }
298/// }
299/// });
300/// ```
301///
302/// Note that you don't need to `.await` on [`send`] as there is always space in the channel buffer.
303///
304/// [`send`]: Sender::send
305/// [received]: Receiver::recv
306/// [spawns]: crate::spawn
307pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
308 let (s, r) = flume::unbounded();
309 (Sender(s), Receiver(r))
310}
311
312/// Create a channel with a maximum capacity.
313///
314/// Bounded channels [`send`] until the channel reaches its capacity then it awaits until a message
315/// is [received] before sending another message.
316///
317/// # Examples
318///
319/// The example [spawns] two parallel tasks, the receiver task takes a while to start receiving but then
320/// rapidly consumes the 2 messages in the buffer and unblocks the sender to send more messages.
321///
322/// ```no_run
323/// use zng_task::{self as task, channel};
324/// # use zng_unit::*;
325///
326/// let (sender, receiver) = channel::bounded(2);
327///
328/// task::spawn(async move {
329/// for msg in ["Hello!", "Data!"].into_iter().cycle() {
330/// task::deadline(300.ms()).await;
331/// if let Err(e) = sender.send(msg).await {
332/// eprintln!("no receiver connected, the message `{e}` was not send");
333/// break;
334/// }
335/// }
336/// });
337/// task::spawn(async move {
338/// task::deadline(5.secs()).await;
339///
340/// loop {
341/// match receiver.recv().await {
342/// Ok(msg) => println!("{msg}"),
343/// Err(_) => {
344/// eprintln!("no message in channel and no sender connected");
345/// break;
346/// }
347/// }
348/// }
349/// });
350/// ```
351///
352/// [`send`]: Sender::send
353/// [received]: Receiver::recv
354/// [spawns]: crate::spawn
355pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
356 let (s, r) = flume::bounded(capacity);
357 (Sender(s), Receiver(r))
358}
359
360/// Create a [`bounded`] channel with `0` capacity.
361///
362/// Rendezvous channels always awaits until the message is [received] to *return* from [`send`], there is no buffer.
363///
364/// # Examples
365///
366/// The example [spawns] two parallel tasks, the sender and receiver *handshake* when transferring the message, the
367/// receiver takes 2 seconds to receive, so the sender takes 2 seconds to send.
368///
369/// ```no_run
370/// use zng_task::{self as task, channel};
371/// # use zng_unit::*;
372/// # use std::time::*;
373/// # use zng_time::*;
374///
375/// let (sender, receiver) = channel::rendezvous();
376///
377/// task::spawn(async move {
378/// loop {
379/// let t = INSTANT.now();
380///
381/// if let Err(e) = sender.send("the stuff").await {
382/// eprintln!(r#"failed to send "{}", no receiver connected"#, e);
383/// break;
384/// }
385///
386/// assert!(t.elapsed() >= 2.secs());
387/// }
388/// });
389/// task::spawn(async move {
390/// loop {
391/// task::deadline(2.secs()).await;
392///
393/// match receiver.recv().await {
394/// Ok(msg) => println!(r#"got "{msg}""#),
395/// Err(_) => {
396/// eprintln!("no sender connected");
397/// break;
398/// }
399/// }
400/// }
401/// });
402/// ```
403///
404/// [`send`]: Sender::send
405/// [received]: Receiver::recv
406/// [spawns]: crate::spawn
407pub fn rendezvous<T>() -> (Sender<T>, Receiver<T>) {
408 bounded::<T>(0)
409}
410
411/// Error during channel send or receive.
412#[derive(Debug, Clone)]
413pub enum ChannelError {
414 /// Channel has disconnected.
415 Disconnected {
416 /// Inner error that caused disconnection.
417 ///
418 /// Is `None` if disconnection was due to endpoint dropping or if the error happened at the other endpoint.
419 cause: Option<Arc<dyn std::error::Error + Send + Sync + 'static>>,
420 },
421 /// Deadline elapsed before message could be send/received.
422 Timeout,
423}
424impl ChannelError {
425 /// Channel has disconnected due to endpoint drop.
426 pub fn disconnected() -> Self {
427 ChannelError::Disconnected { cause: None }
428 }
429
430 /// New from other `error`.
431 pub fn disconnected_by(cause: impl std::error::Error + Send + Sync + 'static) -> Self {
432 ChannelError::Disconnected {
433 cause: Some(Arc::new(cause)),
434 }
435 }
436}
437impl fmt::Display for ChannelError {
438 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439 match self {
440 ChannelError::Disconnected { cause: source } => match source {
441 Some(e) => write!(f, "channel disconnected due to, {e}"),
442 None => write!(f, "channel disconnected"),
443 },
444 ChannelError::Timeout => write!(f, "deadline elapsed before message could be transferred"),
445 }
446 }
447}
448impl std::error::Error for ChannelError {
449 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
450 if let Self::Disconnected { cause: Some(e) } = self {
451 Some(e)
452 } else {
453 None
454 }
455 }
456}
457impl PartialEq for ChannelError {
458 fn eq(&self, other: &Self) -> bool {
459 match (self, other) {
460 (Self::Disconnected { cause: l_cause }, Self::Disconnected { cause: r_cause }) => match (l_cause, r_cause) {
461 (None, None) => true,
462 (Some(a), Some(b)) => a.to_txt() == b.to_txt(),
463 _ => false,
464 },
465 _ => core::mem::discriminant(self) == core::mem::discriminant(other),
466 }
467 }
468}
469impl Eq for ChannelError {}
470impl From<flume::RecvError> for ChannelError {
471 fn from(value: flume::RecvError) -> Self {
472 match value {
473 flume::RecvError::Disconnected => ChannelError::disconnected(),
474 }
475 }
476}
477impl From<flume::RecvTimeoutError> for ChannelError {
478 fn from(value: flume::RecvTimeoutError) -> Self {
479 match value {
480 flume::RecvTimeoutError::Timeout => ChannelError::Timeout,
481 flume::RecvTimeoutError::Disconnected => ChannelError::disconnected(),
482 }
483 }
484}
485impl<T> From<flume::SendError<T>> for ChannelError {
486 fn from(_: flume::SendError<T>) -> Self {
487 ChannelError::disconnected()
488 }
489}
490impl From<flume::TryRecvError> for ChannelError {
491 fn from(value: flume::TryRecvError) -> Self {
492 match value {
493 flume::TryRecvError::Empty => ChannelError::Timeout,
494 flume::TryRecvError::Disconnected => ChannelError::disconnected(),
495 }
496 }
497}