zng_task/
lib.rs

1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13    any::Any,
14    fmt,
15    hash::Hash,
16    mem, panic,
17    pin::Pin,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, Ordering},
21    },
22    task::Poll,
23};
24
25#[doc(no_inline)]
26pub use parking_lot;
27use parking_lot::Mutex;
28
29use zng_app_context::{LocalContext, app_local};
30use zng_time::Deadline;
31use zng_var::{ResponseVar, VarValue, response_done_var, response_var};
32
33#[cfg(test)]
34mod tests;
35
36#[doc(no_inline)]
37pub use rayon;
38
39/// Async filesystem primitives.
40///
41/// This module is the [async-fs](https://docs.rs/async-fs) crate re-exported for convenience.
42pub mod fs {
43    #[doc(inline)]
44    pub use async_fs::*;
45}
46
47pub mod channel;
48pub mod io;
49mod ui;
50
51pub mod http;
52
53pub mod ipc;
54
55mod rayon_ctx;
56
57pub use rayon_ctx::*;
58
59pub use ui::*;
60
61mod progress;
62pub use progress::*;
63
64/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
65///
66/// # Parallel
67///
68/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
69///
70/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
71/// otherwise it will run in a single thread at a time, still not blocking the UI.
72///
73/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
74///
75/// # Async
76///
77/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
78/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
79/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
80/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
81///
82/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
83///
84/// # Examples
85///
86/// ```
87/// # use zng_task::{self as task, *, rayon::iter::*};
88/// # use zng_var::*;
89/// # struct SomeStruct { sum_response: ResponseVar<usize> }
90/// # impl SomeStruct {
91/// fn on_event(&mut self) {
92///     let (responder, response) = response_var();
93///     self.sum_response = response;
94///
95///     task::spawn(async move {
96///         let r = (0..1000).into_par_iter().map(|i| i * i).sum();
97///
98///         responder.respond(r);
99///     });
100/// }
101///
102/// fn on_update(&mut self) {
103///     if let Some(result) = self.sum_response.rsp_new() {
104///         println!("sum of squares 0..1000: {result}");
105///     }
106/// }
107/// # }
108/// ```
109///
110/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
111/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
112///
113/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
114/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
115///
116/// # Panic Handling
117///
118/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
119/// is otherwise ignored.
120///
121/// # Unwind Safety
122///
123/// This function disables the [unwind safety validation], meaning that in case of a panic shared
124/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
125/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
126/// to propagate a panic.
127///
128/// [unwind safety validation]: std::panic::UnwindSafe
129/// [`Waker`]: std::task::Waker
130/// [`rayon`]: https://docs.rs/rayon
131/// [`LocalContext`]: zng_app_context::LocalContext
132/// [`response_var`]: zng_var::response_var
133pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
134where
135    F: Future<Output = ()> + Send + 'static,
136{
137    Arc::new(RayonTask {
138        ctx: LocalContext::capture(),
139        fut: Mutex::new(Some(Box::pin(task.into_future()))),
140    })
141    .poll()
142}
143
144/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
145pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
146where
147    F: Future<Output = ()> + Send + 'static,
148{
149    struct PollRayonTask {
150        fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
151    }
152    impl PollRayonTask {
153        // start task in calling thread
154        fn poll(self: Arc<Self>) {
155            let mut task = self.fut.lock();
156            let (mut t, _) = task.take().unwrap();
157
158            let waker = self.clone().into();
159
160            match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
161                Poll::Ready(()) => {}
162                Poll::Pending => {
163                    let ctx = LocalContext::capture();
164                    *task = Some((t, Some(ctx)));
165                }
166            }
167        }
168    }
169    impl std::task::Wake for PollRayonTask {
170        fn wake(self: Arc<Self>) {
171            // continue task in spawn threads
172            if let Some((task, Some(ctx))) = self.fut.lock().take() {
173                Arc::new(RayonTask {
174                    ctx,
175                    fut: Mutex::new(Some(Box::pin(task))),
176                })
177                .poll();
178            }
179        }
180    }
181
182    Arc::new(PollRayonTask {
183        fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
184    })
185    .poll()
186}
187
188type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
189
190// A future that is its own waker that polls inside rayon spawn tasks.
191struct RayonTask {
192    ctx: LocalContext,
193    fut: Mutex<Option<RayonSpawnFut>>,
194}
195impl RayonTask {
196    fn poll(self: Arc<Self>) {
197        rayon::spawn(move || {
198            // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
199            let mut task = self.fut.lock();
200            if let Some(mut t) = task.take() {
201                let waker = self.clone().into();
202
203                // load app context
204                self.ctx.clone().with_context(move || {
205                    let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
206                        // poll future
207                        if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
208                            // not done
209                            *task = Some(t);
210                        }
211                    }));
212                    if let Err(p) = r {
213                        let p = TaskPanicError::new(p);
214                        tracing::error!("panic in `task::spawn`: {}", p.panic_str().unwrap_or(""));
215                        on_spawn_panic(p);
216                    }
217                });
218            }
219        })
220    }
221}
222impl std::task::Wake for RayonTask {
223    fn wake(self: Arc<Self>) {
224        self.poll()
225    }
226}
227
228/// Rayon join with local context.
229///
230/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
231/// operations.
232///
233/// See `rayon::join` for more details about join.
234///
235/// [`LocalContext`]: zng_app_context::LocalContext
236pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
237where
238    A: FnOnce() -> RA + Send,
239    B: FnOnce() -> RB + Send,
240    RA: Send,
241    RB: Send,
242{
243    self::join_context(move |_| op_a(), move |_| op_b())
244}
245
246/// Rayon join context with local context.
247///
248/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
249/// operations.
250///
251/// See `rayon::join_context` for more details about join.
252///
253/// [`LocalContext`]: zng_app_context::LocalContext
254pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
255where
256    A: FnOnce(rayon::FnContext) -> RA + Send,
257    B: FnOnce(rayon::FnContext) -> RB + Send,
258    RA: Send,
259    RB: Send,
260{
261    let ctx = LocalContext::capture();
262    let ctx = &ctx;
263    rayon::join_context(
264        move |a| {
265            if a.migrated() {
266                ctx.clone().with_context(|| op_a(a))
267            } else {
268                op_a(a)
269            }
270        },
271        move |b| {
272            if b.migrated() {
273                ctx.clone().with_context(|| op_b(b))
274            } else {
275                op_b(b)
276            }
277        },
278    )
279}
280
281/// Rayon scope with local context.
282///
283/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
284/// operations.
285///
286/// See `rayon::scope` for more details about scope.
287///
288/// [`LocalContext`]: zng_app_context::LocalContext
289pub fn scope<'scope, OP, R>(op: OP) -> R
290where
291    OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
292    R: Send,
293{
294    let ctx = LocalContext::capture();
295
296    // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
297    // Is there a better way to do this? I hope so.
298    //
299    // SAFETY:
300    // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
301    // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
302    let ctx_ref: &'_ LocalContext = &ctx;
303    let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
304
305    let r = rayon::scope(move |s| {
306        op(ScopeCtx {
307            scope: s,
308            ctx: ctx_scope_ref,
309        })
310    });
311
312    drop(ctx);
313
314    r
315}
316
317/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
318///
319/// See [`scope`] for more details.
320#[derive(Clone, Copy, Debug)]
321pub struct ScopeCtx<'a, 'scope: 'a> {
322    scope: &'a rayon::Scope<'scope>,
323    ctx: &'scope LocalContext,
324}
325impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
326    /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
327    ///
328    /// See `rayon::Scope::spawn` for more details.
329    pub fn spawn<F>(self, f: F)
330    where
331        F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
332    {
333        let ctx = self.ctx;
334        self.scope
335            .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
336    }
337}
338
339/// Spawn a parallel async task that can also be `.await` for the task result.
340///
341/// # Parallel
342///
343/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
344///
345/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
346/// otherwise it will run in a single thread at a time, still not blocking the UI.
347///
348/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
349///
350/// # Async
351///
352/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
353/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
354/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
355/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
356///
357/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
358///
359/// # Examples
360///
361/// ```
362/// # use zng_task::{self as task, rayon::iter::*};
363/// # struct SomeStruct { sum: usize }
364/// # async fn read_numbers() -> Vec<usize> { vec![] }
365/// # impl SomeStruct {
366/// async fn on_event(&mut self) {
367///     self.sum = task::run(async { read_numbers().await.par_iter().map(|i| i * i).sum() }).await;
368/// }
369/// # }
370/// ```
371///
372/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
373/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
374/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
375///
376/// # Cancellation
377///
378/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
379/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
380/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
381/// result channel is disconnected.
382///
383/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
384///
385/// # Panic Propagation
386///
387/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
388/// can use [`run_catch`] to get the panic as an error instead.
389///
390/// [`resume_unwind`]: panic::resume_unwind
391/// [`Waker`]: std::task::Waker
392/// [`rayon`]: https://docs.rs/rayon
393/// [`LocalContext`]: zng_app_context::LocalContext
394pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
395where
396    R: Send + 'static,
397    T: Future<Output = R> + Send + 'static,
398{
399    match run_catch(task).await {
400        Ok(r) => r,
401        Err(p) => panic::resume_unwind(p.payload),
402    }
403}
404
405/// Like [`run`] but catches panics.
406///
407/// This task works the same and has the same utility as [`run`], except if returns panic messages
408/// as an error instead of propagating the panic.
409///
410/// # Unwind Safety
411///
412/// This function disables the [unwind safety validation], meaning that in case of a panic shared
413/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
414/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
415/// if this function returns an error.
416///
417/// [unwind safety validation]: std::panic::UnwindSafe
418pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> Result<R, TaskPanicError>
419where
420    R: Send + 'static,
421    T: Future<Output = R> + Send + 'static,
422{
423    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
424
425    // A future that is its own waker that polls inside the rayon primary thread-pool.
426    struct RayonCatchTask<R> {
427        ctx: LocalContext,
428        fut: Mutex<Option<Fut<R>>>,
429        sender: flume::Sender<Result<R, TaskPanicError>>,
430    }
431    impl<R: Send + 'static> RayonCatchTask<R> {
432        fn poll(self: Arc<Self>) {
433            let sender = self.sender.clone();
434            if sender.is_disconnected() {
435                return; // cancel.
436            }
437            rayon::spawn(move || {
438                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
439                let mut task = self.fut.lock();
440                if let Some(mut t) = task.take() {
441                    let waker = self.clone().into();
442                    let mut cx = std::task::Context::from_waker(&waker);
443
444                    self.ctx.clone().with_context(|| {
445                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
446                        match r {
447                            Ok(Poll::Ready(r)) => {
448                                drop(task);
449                                let _ = sender.send(Ok(r));
450                            }
451                            Ok(Poll::Pending) => {
452                                *task = Some(t);
453                            }
454                            Err(p) => {
455                                drop(task);
456                                let _ = sender.send(Err(TaskPanicError::new(p)));
457                            }
458                        }
459                    });
460                }
461            })
462        }
463    }
464    impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
465        fn wake(self: Arc<Self>) {
466            self.poll()
467        }
468    }
469
470    let (sender, receiver) = channel::bounded(1);
471
472    Arc::new(RayonCatchTask {
473        ctx: LocalContext::capture(),
474        fut: Mutex::new(Some(Box::pin(task.into_future()))),
475        sender: sender.into(),
476    })
477    .poll();
478
479    receiver.recv().await.unwrap()
480}
481
482/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
483///
484/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
485///
486/// # Examples
487///
488/// ```
489/// # use zng_task::{self as task, rayon::iter::*};
490/// # use zng_var::*;
491/// # struct SomeStruct { sum_response: ResponseVar<usize> }
492/// # async fn read_numbers() -> Vec<usize> { vec![] }
493/// # impl SomeStruct {
494/// fn on_event(&mut self) {
495///     self.sum_response = task::respond(async { read_numbers().await.par_iter().map(|i| i * i).sum() });
496/// }
497///
498/// fn on_update(&mut self) {
499///     if let Some(result) = self.sum_response.rsp_new() {
500///         println!("sum of squares: {result}");
501///     }
502/// }
503/// # }
504/// ```
505///
506/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
507/// `sum_response` that is a [`ResponseVar<R>`].
508///
509/// # Cancellation
510///
511/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
512///
513/// # Panic Handling
514///
515/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
516///
517/// [`resume_unwind`]: panic::resume_unwind
518/// [`ResponseVar<R>`]: zng_var::ResponseVar
519/// [`response_var`]: zng_var::response_var
520pub fn respond<R, F>(task: F) -> ResponseVar<R>
521where
522    R: VarValue,
523    F: Future<Output = R> + Send + 'static,
524{
525    type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
526
527    let (responder, response) = response_var();
528
529    // A future that is its own waker that polls inside the rayon primary thread-pool.
530    struct RayonRespondTask<R: VarValue> {
531        ctx: LocalContext,
532        fut: Mutex<Option<Fut<R>>>,
533        responder: zng_var::ResponderVar<R>,
534    }
535    impl<R: VarValue> RayonRespondTask<R> {
536        fn poll(self: Arc<Self>) {
537            let responder = self.responder.clone();
538            if responder.strong_count() == 2 {
539                return; // cancel.
540            }
541            rayon::spawn(move || {
542                // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
543                let mut task = self.fut.lock();
544                if let Some(mut t) = task.take() {
545                    let waker = self.clone().into();
546                    let mut cx = std::task::Context::from_waker(&waker);
547
548                    self.ctx.clone().with_context(|| {
549                        let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
550                        match r {
551                            Ok(Poll::Ready(r)) => {
552                                drop(task);
553
554                                responder.respond(r);
555                            }
556                            Ok(Poll::Pending) => {
557                                *task = Some(t);
558                            }
559                            Err(p) => {
560                                let p = TaskPanicError::new(p);
561                                tracing::error!("panic in `task::respond`: {}", p.panic_str().unwrap_or(""));
562                                drop(task);
563                                responder.modify(move |_| panic::resume_unwind(p.payload));
564                            }
565                        }
566                    });
567                }
568            })
569        }
570    }
571    impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
572        fn wake(self: Arc<Self>) {
573            self.poll()
574        }
575    }
576
577    Arc::new(RayonRespondTask {
578        ctx: LocalContext::capture(),
579        fut: Mutex::new(Some(Box::pin(task))),
580        responder,
581    })
582    .poll();
583
584    response
585}
586
587/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
588/// if the `task` is pending continues execution like [`respond`].
589pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
590where
591    R: VarValue,
592    F: Future<Output = R> + Send + 'static,
593{
594    enum QuickResponse<R: VarValue> {
595        Quick(Option<R>),
596        Response(zng_var::ResponderVar<R>),
597    }
598    let task = task.into_future();
599    let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
600    poll_spawn(zng_clone_move::async_clmv!(q, {
601        let rsp = task.await;
602
603        match &mut *q.lock() {
604            QuickResponse::Quick(q) => *q = Some(rsp),
605            QuickResponse::Response(r) => r.respond(rsp),
606        }
607    }));
608
609    let mut q = q.lock();
610    match &mut *q {
611        QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
612        _ => {
613            let (responder, response) = response_var();
614            *q = QuickResponse::Response(responder);
615            response
616        }
617    }
618}
619
620/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
621///
622/// # Parallel
623///
624/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
625/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
626/// parts that are blocking.
627///
628/// # Examples
629///
630/// ```
631/// # fn main() { }
632/// # use zng_task as task;
633/// # async fn example() {
634/// task::wait(|| std::fs::read_to_string("file.txt")).await
635/// # ; }
636/// ```
637///
638/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
639/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
640/// like in a async event handler.
641///
642/// # Async Read/Write
643///
644/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
645/// have or want the full file in memory or when you want to apply multiple operations to the file.
646///
647/// # Panic Propagation
648///
649/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
650/// can use [`wait_catch`] to get the panic as an error instead.
651///
652/// [`blocking`]: https://docs.rs/blocking
653/// [`resume_unwind`]: panic::resume_unwind
654pub async fn wait<T, F>(task: F) -> T
655where
656    F: FnOnce() -> T + Send + 'static,
657    T: Send + 'static,
658{
659    match wait_catch(task).await {
660        Ok(r) => r,
661        Err(p) => panic::resume_unwind(p.payload),
662    }
663}
664
665/// Like [`wait`] but catches panics.
666///
667/// This task works the same and has the same utility as [`wait`], except if returns panic messages
668/// as an error instead of propagating the panic.
669///
670/// # Unwind Safety
671///
672/// This function disables the [unwind safety validation], meaning that in case of a panic shared
673/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
674/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
675/// if this function returns an error.
676///
677/// [unwind safety validation]: std::panic::UnwindSafe
678pub async fn wait_catch<T, F>(task: F) -> Result<T, TaskPanicError>
679where
680    F: FnOnce() -> T + Send + 'static,
681    T: Send + 'static,
682{
683    let mut ctx = LocalContext::capture();
684    blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task))))
685        .await
686        .map_err(TaskPanicError::new)
687}
688
689/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
690///
691/// # Panic Handling
692///
693/// If the `task` panics the panic message is logged as an error, and can observed using [`set_spawn_panic_handler`]. It
694/// is otherwise ignored.
695///
696/// # Unwind Safety
697///
698/// This function disables the [unwind safety validation], meaning that in case of a panic shared
699/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
700/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
701/// to propagate a panic.
702///
703/// [unwind safety validation]: std::panic::UnwindSafe
704pub fn spawn_wait<F>(task: F)
705where
706    F: FnOnce() + Send + 'static,
707{
708    spawn(async move {
709        if let Err(p) = wait_catch(task).await {
710            tracing::error!("parallel `spawn_wait` task panicked: {}", p.panic_str().unwrap_or(""));
711            on_spawn_panic(p);
712        }
713    });
714}
715
716/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
717///
718/// # Cancellation
719///
720/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
721///
722/// # Panic Handling
723///
724/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
725pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
726where
727    R: VarValue,
728    F: FnOnce() -> R + Send + 'static,
729{
730    let (responder, response) = response_var();
731    spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
732        Ok(r) => responder.respond(r),
733        Err(p) => {
734            let p = TaskPanicError::new(p);
735            tracing::error!("panic in `task::wait_respond`: {}", p.panic_str().unwrap_or(""));
736            responder.modify(move |_| panic::resume_unwind(p.payload));
737        }
738    });
739    response
740}
741
742/// Blocks the thread until the `task` future finishes.
743///
744/// This function is useful for implementing async tests, using it in an app will probably cause
745/// the app to stop responding.
746///
747/// The crate [`futures-lite`] is used to execute the task.
748///
749/// # Examples
750///
751/// Test a [`run`] call:
752///
753/// ```
754/// use zng_task as task;
755/// # use zng_unit::*;
756/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
757///
758/// #[test]
759/// # fn __() { }
760/// pub fn run_ok() {
761///     let r = task::block_on(task::run(async { foo(32).await }));
762///
763///     # let value =
764///     r.expect("foo(32) was not Ok");
765///     # assert_eq!(32, value);
766/// }
767/// # run_ok();
768/// ```
769///
770/// [`futures-lite`]: https://docs.rs/futures-lite/
771pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
772where
773    F: Future,
774{
775    futures_lite::future::block_on(task.into_future())
776}
777
778/// Continuous poll the `task` until if finishes.
779///
780/// This function is useful for implementing some async tests only, futures don't expect to be polled
781/// continuously. This function is only available in test builds.
782#[cfg(any(test, doc, feature = "test_util"))]
783pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
784where
785    F: Future,
786{
787    use std::pin::pin;
788
789    let mut task = pin!(task.into_future());
790    block_on(future_fn(|cx| match task.as_mut().poll(cx) {
791        Poll::Ready(r) => Poll::Ready(r),
792        Poll::Pending => {
793            cx.waker().wake_by_ref();
794            Poll::Pending
795        }
796    }))
797}
798
799/// Executor used in async doc tests.
800///
801/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
802/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
803#[cfg(any(test, doc, feature = "test_util"))]
804pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
805where
806    F: Future,
807{
808    use zng_unit::TimeUnits;
809
810    if spin {
811        spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
812    } else {
813        block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
814    }
815}
816
817/// A future that is [`Pending`] once and wakes the current task.
818///
819/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
820///
821/// [`Pending`]: std::task::Poll::Pending
822/// [`Ready`]: std::task::Poll::Ready
823/// [`wake`]: std::task::Waker::wake
824pub async fn yield_now() {
825    struct YieldNowFut(bool);
826    impl Future for YieldNowFut {
827        type Output = ();
828
829        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
830            if self.0 {
831                Poll::Ready(())
832            } else {
833                self.0 = true;
834                cx.waker().wake_by_ref();
835                Poll::Pending
836            }
837        }
838    }
839
840    YieldNowFut(false).await
841}
842
843/// A future that is [`Pending`] until the `deadline` is reached.
844///
845/// # Examples
846///
847/// Await 5 seconds in a [`spawn`] parallel task:
848///
849/// ```
850/// use zng_task as task;
851/// use zng_unit::*;
852///
853/// task::spawn(async {
854///     println!("waiting 5 seconds..");
855///     task::deadline(5.secs()).await;
856///     println!("5 seconds elapsed.")
857/// });
858/// ```
859///
860/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
861///
862/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
863/// not at the moment of the first `.await` call.
864///
865/// [`Pending`]: std::task::Poll::Pending
866/// [`futures_timer`]: https://docs.rs/futures-timer
867pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
868    let deadline = deadline.into();
869    if zng_app_context::LocalContext::current_app().is_some() {
870        DEADLINE_SV.read().0(deadline)
871    } else {
872        default_deadline(deadline)
873    }
874}
875
876app_local! {
877    static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
878}
879
880type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
881
882fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
883    if let Some(timeout) = deadline.time_left() {
884        Box::pin(futures_timer::Delay::new(timeout))
885    } else {
886        Box::pin(std::future::ready(()))
887    }
888}
889
890/// Deadline APP integration.
891#[expect(non_camel_case_types)]
892pub struct DEADLINE_APP;
893
894impl DEADLINE_APP {
895    /// Called by the app implementer to setup the [`deadline`] executor.
896    ///
897    /// If no app calls this the [`futures_timer`] executor is used.
898    ///
899    /// [`futures_timer`]: https://docs.rs/futures-timer
900    ///
901    /// # Panics
902    ///
903    /// Panics if called more than once for the same app.
904    pub fn init_deadline_service(&self, service: DeadlineService) {
905        let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
906        if already_set {
907            *DEADLINE_SV.write() = (prev, true);
908            panic!("deadline service already inited for this app");
909        }
910    }
911}
912
913/// Implements a [`Future`] from a closure.
914///
915/// # Examples
916///
917/// A future that is ready with a closure returns `Some(R)`.
918///
919/// ```
920/// use std::task::Poll;
921/// use zng_task as task;
922///
923/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
924///     task::future_fn(|cx| match closure() {
925///         Some(r) => Poll::Ready(r),
926///         None => Poll::Pending,
927///     })
928///     .await
929/// }
930/// ```
931pub async fn future_fn<T, F>(fn_: F) -> T
932where
933    F: FnMut(&mut std::task::Context) -> Poll<T>,
934{
935    struct PollFn<F>(F);
936    impl<F> Unpin for PollFn<F> {}
937    impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
938        type Output = T;
939
940        fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
941            (self.0)(cx)
942        }
943    }
944    PollFn(fn_).await
945}
946
947/// Error when [`with_deadline`] reach a time limit before a task finishes.
948#[derive(Debug, Clone, Copy)]
949#[non_exhaustive]
950pub struct DeadlineError {}
951impl fmt::Display for DeadlineError {
952    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
953        write!(f, "reached deadline")
954    }
955}
956impl std::error::Error for DeadlineError {}
957
958/// Add a [`deadline`] to a future.
959///
960/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
961pub async fn with_deadline<O, F: Future<Output = O>>(
962    fut: impl IntoFuture<IntoFuture = F>,
963    deadline: impl Into<Deadline>,
964) -> Result<F::Output, DeadlineError> {
965    let deadline = deadline.into();
966    any!(async { Ok(fut.await) }, async {
967        self::deadline(deadline).await;
968        Err(DeadlineError {})
969    })
970    .await
971}
972
973/// <span data-del-macro-root></span> A future that *zips* other futures.
974///
975/// The macro input is a comma separated list of future expressions. The macro output is a future
976/// that when ".awaited" produces a tuple of results in the same order as the inputs.
977///
978/// At least one input future is required and any number of futures is accepted. For more than
979/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
980/// some IDEs.
981///
982/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
983/// function to await on all futures in a dynamic list of futures.
984///
985/// # Examples
986///
987/// Await for three different futures to complete:
988///
989/// ```
990/// use zng_task as task;
991///
992/// # task::doc_test(false, async {
993/// let (a, b, c) = task::all!(task::run(async { 'a' }), task::wait(|| "b"), async { b"c" }).await;
994/// # });
995/// ```
996#[macro_export]
997macro_rules! all {
998    ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
999    ($fut0:expr, $fut1:expr $(,)?) => {
1000        $crate::__all! {
1001            fut0: $fut0;
1002            fut1: $fut1;
1003        }
1004    };
1005    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1006        $crate::__all! {
1007            fut0: $fut0;
1008            fut1: $fut1;
1009            fut2: $fut2;
1010        }
1011    };
1012    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1013        $crate::__all! {
1014            fut0: $fut0;
1015            fut1: $fut1;
1016            fut2: $fut2;
1017            fut3: $fut3;
1018        }
1019    };
1020    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1021        $crate::__all! {
1022            fut0: $fut0;
1023            fut1: $fut1;
1024            fut2: $fut2;
1025            fut3: $fut3;
1026            fut4: $fut4;
1027        }
1028    };
1029    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1030        $crate::__all! {
1031            fut0: $fut0;
1032            fut1: $fut1;
1033            fut2: $fut2;
1034            fut3: $fut3;
1035            fut4: $fut4;
1036            fut5: $fut5;
1037        }
1038    };
1039    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1040        $crate::__all! {
1041            fut0: $fut0;
1042            fut1: $fut1;
1043            fut2: $fut2;
1044            fut3: $fut3;
1045            fut4: $fut4;
1046            fut5: $fut5;
1047            fut6: $fut6;
1048        }
1049    };
1050    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1051        $crate::__all! {
1052            fut0: $fut0;
1053            fut1: $fut1;
1054            fut2: $fut2;
1055            fut3: $fut3;
1056            fut4: $fut4;
1057            fut5: $fut5;
1058            fut6: $fut6;
1059            fut7: $fut7;
1060        }
1061    };
1062    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1063}
1064
1065#[doc(hidden)]
1066#[macro_export]
1067macro_rules! __all {
1068    ($($ident:ident: $fut:expr;)+) => {
1069        {
1070            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1071            $crate::future_fn(move |cx| {
1072                use std::task::Poll;
1073
1074                let mut pending = false;
1075
1076                $(
1077                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1078                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1079                        // Future::poll call, so it will not move.
1080                        let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1081                        if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1082                            $ident = $crate::FutureOrOutput::Output(r);
1083                        } else {
1084                            pending = true;
1085                        }
1086                    }
1087                )+
1088
1089                if pending {
1090                    Poll::Pending
1091                } else {
1092                    Poll::Ready(($($ident.take_output()),+))
1093                }
1094            })
1095        }
1096    }
1097}
1098
1099#[doc(hidden)]
1100pub enum FutureOrOutput<F: Future> {
1101    Future(F),
1102    Output(F::Output),
1103    Taken,
1104}
1105impl<F: Future> FutureOrOutput<F> {
1106    pub fn take_output(&mut self) -> F::Output {
1107        match std::mem::replace(self, Self::Taken) {
1108            FutureOrOutput::Output(o) => o,
1109            _ => unreachable!(),
1110        }
1111    }
1112}
1113
1114/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1115///
1116/// This is the dynamic version of [`all!`].
1117pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1118    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1119    future_fn(move |cx| {
1120        let mut pending = false;
1121        for input in &mut futures {
1122            if let FutureOrOutput::Future(fut) = input {
1123                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1124                // Future::poll call, so it will not move.
1125                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1126                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1127                    *input = FutureOrOutput::Output(r);
1128                } else {
1129                    pending = true;
1130                }
1131            }
1132        }
1133
1134        if pending {
1135            Poll::Pending
1136        } else {
1137            Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1138        }
1139    })
1140    .await
1141}
1142
1143/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1144///
1145/// The macro input is comma separated list of future expressions, the futures must
1146/// all have the same output type. The macro output is a future that when ".awaited" produces
1147/// a single output type instance returned by the first input future that completes.
1148///
1149/// At least one input future is required and any number of futures is accepted. For more than
1150/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1151/// some IDEs.
1152///
1153/// If two futures are ready at the same time the result of the first future in the input list is used.
1154/// After one future is ready the other futures are not polled again and are dropped.
1155///
1156/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1157/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1158///
1159/// # Examples
1160///
1161/// Await for the first of three futures to complete:
1162///
1163/// ```
1164/// use zng_task as task;
1165/// use zng_unit::*;
1166///
1167/// # task::doc_test(false, async {
1168/// let r = task::any!(
1169///     task::run(async {
1170///         task::deadline(300.ms()).await;
1171///         'a'
1172///     }),
1173///     task::wait(|| 'b'),
1174///     async {
1175///         task::deadline(300.ms()).await;
1176///         'c'
1177///     }
1178/// )
1179/// .await;
1180///
1181/// assert_eq!('b', r);
1182/// # });
1183/// ```
1184#[macro_export]
1185macro_rules! any {
1186    ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1187    ($fut0:expr, $fut1:expr $(,)?) => {
1188        $crate::__any! {
1189            fut0: $fut0;
1190            fut1: $fut1;
1191        }
1192    };
1193    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1194        $crate::__any! {
1195            fut0: $fut0;
1196            fut1: $fut1;
1197            fut2: $fut2;
1198        }
1199    };
1200    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1201        $crate::__any! {
1202            fut0: $fut0;
1203            fut1: $fut1;
1204            fut2: $fut2;
1205            fut3: $fut3;
1206        }
1207    };
1208    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1209        $crate::__any! {
1210            fut0: $fut0;
1211            fut1: $fut1;
1212            fut2: $fut2;
1213            fut3: $fut3;
1214            fut4: $fut4;
1215        }
1216    };
1217    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1218        $crate::__any! {
1219            fut0: $fut0;
1220            fut1: $fut1;
1221            fut2: $fut2;
1222            fut3: $fut3;
1223            fut4: $fut4;
1224            fut5: $fut5;
1225        }
1226    };
1227    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1228        $crate::__any! {
1229            fut0: $fut0;
1230            fut1: $fut1;
1231            fut2: $fut2;
1232            fut3: $fut3;
1233            fut4: $fut4;
1234            fut5: $fut5;
1235            fut6: $fut6;
1236        }
1237    };
1238    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1239        $crate::__any! {
1240            fut0: $fut0;
1241            fut1: $fut1;
1242            fut2: $fut2;
1243            fut3: $fut3;
1244            fut4: $fut4;
1245            fut5: $fut5;
1246            fut6: $fut6;
1247            fut7: $fut7;
1248        }
1249    };
1250    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1251}
1252#[doc(hidden)]
1253#[macro_export]
1254macro_rules! __any {
1255    ($($ident:ident: $fut:expr;)+) => {
1256        {
1257            $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1258            $crate::future_fn(move |cx| {
1259                use std::task::Poll;
1260                $(
1261                    // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1262                    // Future::poll call, so it will not move.
1263                    let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1264                    if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1265                        return Poll::Ready(r)
1266                    }
1267                )+
1268
1269                Poll::Pending
1270            })
1271        }
1272    }
1273}
1274#[doc(hidden)]
1275pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1276
1277/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1278///
1279/// This is the dynamic version of [`any!`].
1280pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1281    let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1282    future_fn(move |cx| {
1283        for fut in &mut futures {
1284            // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1285            // Future::poll call, so it will not move.
1286            let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1287            if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1288                return Poll::Ready(r);
1289            }
1290        }
1291        Poll::Pending
1292    })
1293    .await
1294}
1295
1296/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1297///
1298/// The macro input is comma separated list of future expressions, the futures must
1299/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1300/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1301/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1302///
1303/// At least one input future is required and any number of futures is accepted. For more than
1304/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1305/// some IDEs.
1306///
1307/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1308/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1309/// is ready and `Err(E)` it is also not polled again and dropped.
1310///
1311/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1312/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1313///
1314/// # Examples
1315///
1316/// Await for the first of three futures to complete with `Ok`:
1317///
1318/// ```
1319/// use zng_task as task;
1320/// # #[derive(Debug, PartialEq)]
1321/// # pub struct FooError;
1322/// # task::doc_test(false, async {
1323/// let r = task::any_ok!(
1324///     task::run(async { Err::<char, _>("error") }),
1325///     task::wait(|| Ok::<_, FooError>('b')),
1326///     async { Err::<char, _>(FooError) }
1327/// )
1328/// .await;
1329///
1330/// assert_eq!(Ok('b'), r);
1331/// # });
1332/// ```
1333#[macro_export]
1334macro_rules! any_ok {
1335    ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1336    ($fut0:expr, $fut1:expr $(,)?) => {
1337        $crate::__any_ok! {
1338            fut0: $fut0;
1339            fut1: $fut1;
1340        }
1341    };
1342    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1343        $crate::__any_ok! {
1344            fut0: $fut0;
1345            fut1: $fut1;
1346            fut2: $fut2;
1347        }
1348    };
1349    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1350        $crate::__any_ok! {
1351            fut0: $fut0;
1352            fut1: $fut1;
1353            fut2: $fut2;
1354            fut3: $fut3;
1355        }
1356    };
1357    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1358        $crate::__any_ok! {
1359            fut0: $fut0;
1360            fut1: $fut1;
1361            fut2: $fut2;
1362            fut3: $fut3;
1363            fut4: $fut4;
1364        }
1365    };
1366    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1367        $crate::__any_ok! {
1368            fut0: $fut0;
1369            fut1: $fut1;
1370            fut2: $fut2;
1371            fut3: $fut3;
1372            fut4: $fut4;
1373            fut5: $fut5;
1374        }
1375    };
1376    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1377        $crate::__any_ok! {
1378            fut0: $fut0;
1379            fut1: $fut1;
1380            fut2: $fut2;
1381            fut3: $fut3;
1382            fut4: $fut4;
1383            fut5: $fut5;
1384            fut6: $fut6;
1385        }
1386    };
1387    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1388        $crate::__any_ok! {
1389            fut0: $fut0;
1390            fut1: $fut1;
1391            fut2: $fut2;
1392            fut3: $fut3;
1393            fut4: $fut4;
1394            fut5: $fut5;
1395            fut6: $fut6;
1396            fut7: $fut7;
1397        }
1398    };
1399    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1400}
1401
1402#[doc(hidden)]
1403#[macro_export]
1404macro_rules! __any_ok {
1405    ($($ident:ident: $fut: expr;)+) => {
1406        {
1407            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1408            $crate::future_fn(move |cx| {
1409                use std::task::Poll;
1410
1411                let mut pending = false;
1412
1413                $(
1414                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1415                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1416                        // Future::poll call, so it will not move.
1417                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1418                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1419                            match r {
1420                                Ok(r) => return Poll::Ready(Ok(r)),
1421                                Err(e) => {
1422                                    $ident = $crate::FutureOrOutput::Output(Err(e));
1423                                }
1424                            }
1425                        } else {
1426                            pending = true;
1427                        }
1428                    }
1429                )+
1430
1431                if pending {
1432                    Poll::Pending
1433                } else {
1434                    Poll::Ready(Err((
1435                        $($ident.take_output().unwrap_err()),+
1436                    )))
1437                }
1438            })
1439        }
1440    }
1441}
1442
1443/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1444///
1445/// This is the dynamic version of [`all_some!`].
1446pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1447    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1448    future_fn(move |cx| {
1449        let mut pending = false;
1450        for input in &mut futures {
1451            if let FutureOrOutput::Future(fut) = input {
1452                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1453                // Future::poll call, so it will not move.
1454                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1455                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1456                    match r {
1457                        Ok(r) => return Poll::Ready(Ok(r)),
1458                        Err(e) => *input = FutureOrOutput::Output(Err(e)),
1459                    }
1460                } else {
1461                    pending = true;
1462                }
1463            }
1464        }
1465
1466        if pending {
1467            Poll::Pending
1468        } else {
1469            Poll::Ready(Err(futures
1470                .iter_mut()
1471                .map(|f| match f.take_output() {
1472                    Ok(_) => unreachable!(),
1473                    Err(e) => e,
1474                })
1475                .collect()))
1476        }
1477    })
1478    .await
1479}
1480
1481/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1482///
1483/// The macro input is comma separated list of future expressions, the futures must
1484/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1485/// a single output type instance returned by the first input future that completes with a `Some`.
1486/// If all futures complete with a `None` the output is `None`.
1487///
1488/// At least one input future is required and any number of futures is accepted. For more than
1489/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1490/// some IDEs.
1491///
1492/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1493/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1494/// is ready and `None` it is also not polled again and dropped.
1495///
1496/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1497/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1498///
1499/// # Examples
1500///
1501/// Await for the first of three futures to complete with `Some`:
1502///
1503/// ```
1504/// use zng_task as task;
1505/// # task::doc_test(false, async {
1506/// let r = task::any_some!(task::run(async { None::<char> }), task::wait(|| Some('b')), async { None::<char> }).await;
1507///
1508/// assert_eq!(Some('b'), r);
1509/// # });
1510/// ```
1511#[macro_export]
1512macro_rules! any_some {
1513    ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1514    ($fut0:expr, $fut1:expr $(,)?) => {
1515        $crate::__any_some! {
1516            fut0: $fut0;
1517            fut1: $fut1;
1518        }
1519    };
1520    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1521        $crate::__any_some! {
1522            fut0: $fut0;
1523            fut1: $fut1;
1524            fut2: $fut2;
1525        }
1526    };
1527    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1528        $crate::__any_some! {
1529            fut0: $fut0;
1530            fut1: $fut1;
1531            fut2: $fut2;
1532            fut3: $fut3;
1533        }
1534    };
1535    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1536        $crate::__any_some! {
1537            fut0: $fut0;
1538            fut1: $fut1;
1539            fut2: $fut2;
1540            fut3: $fut3;
1541            fut4: $fut4;
1542        }
1543    };
1544    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1545        $crate::__any_some! {
1546            fut0: $fut0;
1547            fut1: $fut1;
1548            fut2: $fut2;
1549            fut3: $fut3;
1550            fut4: $fut4;
1551            fut5: $fut5;
1552        }
1553    };
1554    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1555        $crate::__any_some! {
1556            fut0: $fut0;
1557            fut1: $fut1;
1558            fut2: $fut2;
1559            fut3: $fut3;
1560            fut4: $fut4;
1561            fut5: $fut5;
1562            fut6: $fut6;
1563        }
1564    };
1565    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1566        $crate::__any_some! {
1567            fut0: $fut0;
1568            fut1: $fut1;
1569            fut2: $fut2;
1570            fut3: $fut3;
1571            fut4: $fut4;
1572            fut5: $fut5;
1573            fut6: $fut6;
1574            fut7: $fut7;
1575        }
1576    };
1577    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1578}
1579
1580#[doc(hidden)]
1581#[macro_export]
1582macro_rules! __any_some {
1583    ($($ident:ident: $fut: expr;)+) => {
1584        {
1585            $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1586            $crate::future_fn(move |cx| {
1587                use std::task::Poll;
1588
1589                let mut pending = false;
1590
1591                $(
1592                    if let Some(fut) = $ident.as_mut() {
1593                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1594                        // Future::poll call, so it will not move.
1595                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1596                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1597                            if let Some(r) = r {
1598                                return Poll::Ready(Some(r));
1599                            }
1600                            $ident = None;
1601                        } else {
1602                            pending = true;
1603                        }
1604                    }
1605                )+
1606
1607                if pending {
1608                    Poll::Pending
1609                } else {
1610                    Poll::Ready(None)
1611                }
1612            })
1613        }
1614    }
1615}
1616
1617/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1618///
1619/// This is the dynamic version of [`all_some!`].
1620pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1621    let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1622    future_fn(move |cx| {
1623        let mut pending = false;
1624        for input in &mut futures {
1625            if let Some(fut) = input {
1626                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1627                // Future::poll call, so it will not move.
1628                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1629                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1630                    match r {
1631                        Some(r) => return Poll::Ready(Some(r)),
1632                        None => *input = None,
1633                    }
1634                } else {
1635                    pending = true;
1636                }
1637            }
1638        }
1639
1640        if pending { Poll::Pending } else { Poll::Ready(None) }
1641    })
1642    .await
1643}
1644
1645/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1646/// any future is ready with an `Err(E)` result.
1647///
1648/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1649/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1650/// `Ok` types.
1651///
1652/// At least one input future is required and any number of futures is accepted. For more than
1653/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1654/// some IDEs.
1655///
1656/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1657/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1658/// is ready it is also not polled again and dropped.
1659///
1660/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1661/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1662///
1663/// # Examples
1664///
1665/// Await for the first of three futures to complete with `Ok(T)`:
1666///
1667/// ```
1668/// use zng_task as task;
1669/// # #[derive(Debug, PartialEq)]
1670/// # struct FooError;
1671/// # task::doc_test(false, async {
1672/// let r = task::all_ok!(
1673///     task::run(async { Ok::<_, FooError>('a') }),
1674///     task::wait(|| Ok::<_, FooError>('b')),
1675///     async { Ok::<_, FooError>('c') }
1676/// )
1677/// .await;
1678///
1679/// assert_eq!(Ok(('a', 'b', 'c')), r);
1680/// # });
1681/// ```
1682///
1683/// And in if any completes with `Err(E)`:
1684///
1685/// ```
1686/// use zng_task as task;
1687/// # #[derive(Debug, PartialEq)]
1688/// # struct FooError;
1689/// # task::doc_test(false, async {
1690/// let r = task::all_ok!(task::run(async { Ok('a') }), task::wait(|| Err::<char, _>(FooError)), async {
1691///     Ok('c')
1692/// })
1693/// .await;
1694///
1695/// assert_eq!(Err(FooError), r);
1696/// # });
1697/// ```
1698#[macro_export]
1699macro_rules! all_ok {
1700    ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1701    ($fut0:expr, $fut1:expr $(,)?) => {
1702        $crate::__all_ok! {
1703            fut0: $fut0;
1704            fut1: $fut1;
1705        }
1706    };
1707    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1708        $crate::__all_ok! {
1709            fut0: $fut0;
1710            fut1: $fut1;
1711            fut2: $fut2;
1712        }
1713    };
1714    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1715        $crate::__all_ok! {
1716            fut0: $fut0;
1717            fut1: $fut1;
1718            fut2: $fut2;
1719            fut3: $fut3;
1720        }
1721    };
1722    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1723        $crate::__all_ok! {
1724            fut0: $fut0;
1725            fut1: $fut1;
1726            fut2: $fut2;
1727            fut3: $fut3;
1728            fut4: $fut4;
1729        }
1730    };
1731    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1732        $crate::__all_ok! {
1733            fut0: $fut0;
1734            fut1: $fut1;
1735            fut2: $fut2;
1736            fut3: $fut3;
1737            fut4: $fut4;
1738            fut5: $fut5;
1739        }
1740    };
1741    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1742        $crate::__all_ok! {
1743            fut0: $fut0;
1744            fut1: $fut1;
1745            fut2: $fut2;
1746            fut3: $fut3;
1747            fut4: $fut4;
1748            fut5: $fut5;
1749            fut6: $fut6;
1750        }
1751    };
1752    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1753        $crate::__all_ok! {
1754            fut0: $fut0;
1755            fut1: $fut1;
1756            fut2: $fut2;
1757            fut3: $fut3;
1758            fut4: $fut4;
1759            fut5: $fut5;
1760            fut6: $fut6;
1761            fut7: $fut7;
1762        }
1763    };
1764    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1765}
1766
1767#[doc(hidden)]
1768#[macro_export]
1769macro_rules! __all_ok {
1770    ($($ident:ident: $fut: expr;)+) => {
1771        {
1772            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1773            $crate::future_fn(move |cx| {
1774                use std::task::Poll;
1775
1776                let mut pending = false;
1777
1778                $(
1779                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1780                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1781                        // Future::poll call, so it will not move.
1782                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1783                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1784                            match r {
1785                                Ok(r) => {
1786                                    $ident = $crate::FutureOrOutput::Output(Ok(r))
1787                                },
1788                                Err(e) => return Poll::Ready(Err(e)),
1789                            }
1790                        } else {
1791                            pending = true;
1792                        }
1793                    }
1794                )+
1795
1796                if pending {
1797                    Poll::Pending
1798                } else {
1799                    Poll::Ready(Ok((
1800                        $($ident.take_output().unwrap()),+
1801                    )))
1802                }
1803            })
1804        }
1805    }
1806}
1807
1808/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1809///
1810/// This is the dynamic version of [`all_ok!`].
1811pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1812    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1813    future_fn(move |cx| {
1814        let mut pending = false;
1815        for input in &mut futures {
1816            if let FutureOrOutput::Future(fut) = input {
1817                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1818                // Future::poll call, so it will not move.
1819                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1820                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1821                    match r {
1822                        Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1823                        Err(e) => return Poll::Ready(Err(e)),
1824                    }
1825                } else {
1826                    pending = true;
1827                }
1828            }
1829        }
1830
1831        if pending {
1832            Poll::Pending
1833        } else {
1834            Poll::Ready(Ok(futures
1835                .iter_mut()
1836                .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1837                .collect()))
1838        }
1839    })
1840    .await
1841}
1842
1843/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1844/// is future ready with `None`.
1845///
1846/// The macro input is comma separated list of future expressions, the futures must
1847/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1848/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1849///
1850/// At least one input future is required and any number of futures is accepted. For more than
1851/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1852/// some IDEs.
1853///
1854/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1855/// is ready it is also not polled again and dropped.
1856///
1857/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1858/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1859///
1860/// # Examples
1861///
1862/// Await for the first of three futures to complete with `Some`:
1863///
1864/// ```
1865/// use zng_task as task;
1866/// # task::doc_test(false, async {
1867/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| Some('b')), async { Some('c') }).await;
1868///
1869/// assert_eq!(Some(('a', 'b', 'c')), r);
1870/// # });
1871/// ```
1872///
1873/// Completes with `None` if any future completes with `None`:
1874///
1875/// ```
1876/// # use zng_task as task;
1877/// # task::doc_test(false, async {
1878/// let r = task::all_some!(task::run(async { Some('a') }), task::wait(|| None::<char>), async { Some('b') }).await;
1879///
1880/// assert_eq!(None, r);
1881/// # });
1882/// ```
1883#[macro_export]
1884macro_rules! all_some {
1885    ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1886    ($fut0:expr, $fut1:expr $(,)?) => {
1887        $crate::__all_some! {
1888            fut0: $fut0;
1889            fut1: $fut1;
1890        }
1891    };
1892    ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1893        $crate::__all_some! {
1894            fut0: $fut0;
1895            fut1: $fut1;
1896            fut2: $fut2;
1897        }
1898    };
1899    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1900        $crate::__all_some! {
1901            fut0: $fut0;
1902            fut1: $fut1;
1903            fut2: $fut2;
1904            fut3: $fut3;
1905        }
1906    };
1907    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1908        $crate::__all_some! {
1909            fut0: $fut0;
1910            fut1: $fut1;
1911            fut2: $fut2;
1912            fut3: $fut3;
1913            fut4: $fut4;
1914        }
1915    };
1916    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1917        $crate::__all_some! {
1918            fut0: $fut0;
1919            fut1: $fut1;
1920            fut2: $fut2;
1921            fut3: $fut3;
1922            fut4: $fut4;
1923            fut5: $fut5;
1924        }
1925    };
1926    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1927        $crate::__all_some! {
1928            fut0: $fut0;
1929            fut1: $fut1;
1930            fut2: $fut2;
1931            fut3: $fut3;
1932            fut4: $fut4;
1933            fut5: $fut5;
1934            fut6: $fut6;
1935        }
1936    };
1937    ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1938        $crate::__all_some! {
1939            fut0: $fut0;
1940            fut1: $fut1;
1941            fut2: $fut2;
1942            fut3: $fut3;
1943            fut4: $fut4;
1944            fut5: $fut5;
1945            fut6: $fut6;
1946            fut7: $fut7;
1947        }
1948    };
1949    ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1950}
1951
1952#[doc(hidden)]
1953#[macro_export]
1954macro_rules! __all_some {
1955    ($($ident:ident: $fut: expr;)+) => {
1956        {
1957            $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1958            $crate::future_fn(move |cx| {
1959                use std::task::Poll;
1960
1961                let mut pending = false;
1962
1963                $(
1964                    if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1965                        // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1966                        // Future::poll call, so it will not move.
1967                        let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1968                        if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1969                            if r.is_none() {
1970                                return Poll::Ready(None);
1971                            }
1972
1973                            $ident = $crate::FutureOrOutput::Output(r);
1974                        } else {
1975                            pending = true;
1976                        }
1977                    }
1978                )+
1979
1980                if pending {
1981                    Poll::Pending
1982                } else {
1983                    Poll::Ready(Some((
1984                        $($ident.take_output().unwrap()),+
1985                    )))
1986                }
1987            })
1988        }
1989    }
1990}
1991
1992/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
1993///
1994/// This is the dynamic version of [`all_some!`].
1995pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
1996    let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1997    future_fn(move |cx| {
1998        let mut pending = false;
1999        for input in &mut futures {
2000            if let FutureOrOutput::Future(fut) = input {
2001                // SAFETY: the closure owns $ident and is an exclusive borrow inside a
2002                // Future::poll call, so it will not move.
2003                let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
2004                if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
2005                    match r {
2006                        Some(r) => *input = FutureOrOutput::Output(Some(r)),
2007                        None => return Poll::Ready(None),
2008                    }
2009                } else {
2010                    pending = true;
2011                }
2012            }
2013        }
2014
2015        if pending {
2016            Poll::Pending
2017        } else {
2018            Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2019        }
2020    })
2021    .await
2022}
2023
2024/// A future that will await until [`set`] is called.
2025///
2026/// # Examples
2027///
2028/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2029///
2030/// ```
2031/// use zng_clone_move::async_clmv;
2032/// use zng_task::{self as task, *};
2033///
2034/// let signal = SignalOnce::default();
2035///
2036/// task::spawn(async_clmv!(signal, {
2037///     signal.await;
2038///     println!("After Signal!");
2039/// }));
2040///
2041/// signal.set();
2042/// ```
2043///
2044/// [`set`]: SignalOnce::set
2045#[derive(Default, Clone)]
2046pub struct SignalOnce(Arc<SignalInner>);
2047impl fmt::Debug for SignalOnce {
2048    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2049        write!(f, "SignalOnce({})", self.is_set())
2050    }
2051}
2052impl PartialEq for SignalOnce {
2053    fn eq(&self, other: &Self) -> bool {
2054        Arc::ptr_eq(&self.0, &other.0)
2055    }
2056}
2057impl Eq for SignalOnce {}
2058impl Hash for SignalOnce {
2059    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2060        Arc::as_ptr(&self.0).hash(state)
2061    }
2062}
2063impl SignalOnce {
2064    /// New unsigned.
2065    pub fn new() -> Self {
2066        Self::default()
2067    }
2068
2069    /// New signaled.
2070    pub fn new_set() -> Self {
2071        let s = Self::new();
2072        s.set();
2073        s
2074    }
2075
2076    /// If the signal was set.
2077    pub fn is_set(&self) -> bool {
2078        self.0.signaled.load(Ordering::Relaxed)
2079    }
2080
2081    /// Sets the signal and awakes listeners.
2082    pub fn set(&self) {
2083        if !self.0.signaled.swap(true, Ordering::Relaxed) {
2084            let listeners = mem::take(&mut *self.0.listeners.lock());
2085            for listener in listeners {
2086                listener.wake();
2087            }
2088        }
2089    }
2090}
2091impl Future for SignalOnce {
2092    type Output = ();
2093
2094    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2095        if self.0.signaled.load(Ordering::Relaxed) {
2096            return Poll::Ready(());
2097        }
2098
2099        let mut listeners = self.0.listeners.lock();
2100        if self.0.signaled.load(Ordering::Relaxed) {
2101            return Poll::Ready(());
2102        }
2103
2104        let waker = cx.waker();
2105        if !listeners.iter().any(|w| w.will_wake(waker)) {
2106            listeners.push(waker.clone());
2107        }
2108
2109        Poll::Pending
2110    }
2111}
2112
2113#[derive(Default)]
2114struct SignalInner {
2115    signaled: AtomicBool,
2116    listeners: Mutex<Vec<std::task::Waker>>,
2117}
2118
2119/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2120///
2121/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2122/// known at the moment the first request is made.
2123///  
2124/// [`Waker`]: std::task::Waker
2125#[derive(Clone)]
2126pub struct McWaker(Arc<WakeVec>);
2127
2128#[derive(Default)]
2129struct WakeVec(Mutex<Vec<std::task::Waker>>);
2130impl WakeVec {
2131    fn push(&self, waker: std::task::Waker) -> bool {
2132        let mut v = self.0.lock();
2133
2134        let return_waker = v.is_empty();
2135
2136        v.push(waker);
2137
2138        return_waker
2139    }
2140
2141    fn cancel(&self) {
2142        let mut v = self.0.lock();
2143
2144        debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2145
2146        v.clear();
2147    }
2148}
2149impl std::task::Wake for WakeVec {
2150    fn wake(self: Arc<Self>) {
2151        for w in mem::take(&mut *self.0.lock()) {
2152            w.wake();
2153        }
2154    }
2155}
2156impl McWaker {
2157    /// New empty waker.
2158    pub fn empty() -> Self {
2159        Self(Arc::new(WakeVec::default()))
2160    }
2161
2162    /// Register a `waker` to wake once when `self` awakes.
2163    ///
2164    /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2165    /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2166    /// is ready [`cancel`] must be called.
2167    ///
2168    /// [`cancel`]: Self::cancel
2169    pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2170        if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2171    }
2172
2173    /// Clear current registered wakers.
2174    pub fn cancel(&self) {
2175        self.0.cancel()
2176    }
2177}
2178
2179/// Panic payload, captured by [`std::panic::catch_unwind`].
2180#[non_exhaustive]
2181pub struct TaskPanicError {
2182    /// Panic payload.
2183    pub payload: Box<dyn Any + Send + 'static>,
2184}
2185impl TaskPanicError {
2186    /// New from panic payload.
2187    pub fn new(payload: Box<dyn Any + Send + 'static>) -> Self {
2188        Self { payload }
2189    }
2190
2191    /// Get the panic string if the `payload` is string like.
2192    pub fn panic_str(&self) -> Option<&str> {
2193        if let Some(s) = self.payload.downcast_ref::<&str>() {
2194            Some(s)
2195        } else if let Some(s) = self.payload.downcast_ref::<String>() {
2196            Some(s)
2197        } else {
2198            None
2199        }
2200    }
2201}
2202impl fmt::Debug for TaskPanicError {
2203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2204        f.debug_struct("TaskPanicError").field("panic_str()", &self.panic_str()).finish()
2205    }
2206}
2207impl fmt::Display for TaskPanicError {
2208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2209        if let Some(s) = self.panic_str() { f.write_str(s) } else { Ok(()) }
2210    }
2211}
2212impl std::error::Error for TaskPanicError {}
2213
2214type SpawnPanicHandler = Box<dyn FnMut(TaskPanicError) + Send + 'static>;
2215
2216app_local! {
2217    // Mutex for Sync only
2218    static SPAWN_PANIC_HANDLERS: Option<Mutex<SpawnPanicHandler>> = None;
2219}
2220
2221/// Set a `handler` that is called when spawn tasks panic.
2222///
2223/// On panic the tasks [`spawn`], [`poll_spawn`] and [`spawn_wait`] log an error, notifies the `handler` and otherwise ignores the panic.
2224///
2225/// The handler is set for the process lifetime, only handler can be set per app. The handler is called inside the same [`LocalContext`]
2226/// and thread the task that panicked was called in.
2227///
2228/// ```
2229/// # macro_rules! example { () => {
2230/// task::set_spawn_panic_handler(|p| {
2231///     UPDATES
2232///         .run_hn_once(hn_once!(|_| {
2233///             std::panic::resume_unwind(p.payload);
2234///         }))
2235///         .perm();
2236/// });
2237/// # }}
2238/// ```
2239///
2240/// The example above shows how to set a handler that propagates the panic to the app main thread.
2241///
2242/// # Panics
2243///
2244/// Panics if another handler is already set in the same app.
2245///
2246/// Panics if no app is running in the caller thread.
2247pub fn set_spawn_panic_handler(handler: impl FnMut(TaskPanicError) + Send + 'static) {
2248    let mut h = SPAWN_PANIC_HANDLERS.try_write().expect("a spawn panic handler is already set");
2249    assert!(h.is_none(), "a spawn panic handler is already set");
2250    *h = Some(Mutex::new(Box::new(handler)));
2251}
2252
2253fn on_spawn_panic(p: TaskPanicError) {
2254    if let Some(f) = &mut *SPAWN_PANIC_HANDLERS.write() {
2255        f.get_mut()(p)
2256    }
2257}