zng_task/lib.rs
1#![doc(html_favicon_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://raw.githubusercontent.com/zng-ui/zng/main/examples/image/res/zng-logo.png")]
3//!
4//! Parallel async tasks and async task runners.
5//!
6//! # Crate
7//!
8#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
9#![warn(unused_extern_crates)]
10#![warn(missing_docs)]
11
12use std::{
13 fmt,
14 hash::Hash,
15 mem, panic,
16 pin::Pin,
17 sync::{
18 Arc,
19 atomic::{AtomicBool, Ordering},
20 },
21 task::Poll,
22};
23
24#[doc(no_inline)]
25pub use parking_lot;
26use parking_lot::Mutex;
27
28mod crate_util;
29
30use crate::crate_util::PanicResult;
31use zng_app_context::{LocalContext, app_local};
32use zng_time::Deadline;
33use zng_var::{AnyVar, ResponseVar, VarValue, response_done_var, response_var};
34
35#[cfg(test)]
36mod tests;
37
38#[doc(no_inline)]
39pub use rayon;
40
41/// Async filesystem primitives.
42///
43/// This module is the [async-fs](https://docs.rs/async-fs) crate re-exported for convenience.
44pub mod fs {
45 #[doc(inline)]
46 pub use async_fs::*;
47}
48
49pub mod channel;
50pub mod io;
51mod ui;
52
53pub mod http;
54
55pub mod ipc;
56
57mod rayon_ctx;
58
59pub use rayon_ctx::*;
60
61pub use ui::*;
62
63mod progress;
64pub use progress::*;
65
66/// Spawn a parallel async task, this function is not blocking and the `task` starts executing immediately.
67///
68/// # Parallel
69///
70/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
71///
72/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
73/// otherwise it will run in a single thread at a time, still not blocking the UI.
74///
75/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
76///
77/// # Async
78///
79/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
80/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
81/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
82/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
83///
84/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
85///
86/// # Examples
87///
88/// ```
89/// # use zng_task::{self as task, *, rayon::iter::*};
90/// # use zng_var::*;
91/// # struct SomeStruct { sum_response: ResponseVar<usize> }
92/// # impl SomeStruct {
93/// fn on_event(&mut self) {
94/// let (responder, response) = response_var();
95/// self.sum_response = response;
96///
97/// task::spawn(async move {
98/// let r = (0..1000).into_par_iter().map(|i| i * i).sum();
99///
100/// responder.respond(r);
101/// });
102/// }
103///
104/// fn on_update(&mut self) {
105/// if let Some(result) = self.sum_response.rsp_new() {
106/// println!("sum of squares 0..1000: {result}");
107/// }
108/// }
109/// # }
110/// ```
111///
112/// The example uses the `rayon` parallel iterator to compute a result and uses a [`response_var`] to send the result to the UI.
113/// The task captures the caller [`LocalContext`] so the response variable will set correctly.
114///
115/// Note that this function is the most basic way to spawn a parallel task where you must setup channels to the rest of the app yourself,
116/// you can use [`respond`] to avoid having to manually set a response, or [`run`] to `.await` the result.
117///
118/// # Panic Handling
119///
120/// If the `task` panics the panic message is logged as an error, the panic is otherwise ignored.
121///
122/// # Unwind Safety
123///
124/// This function disables the [unwind safety validation], meaning that in case of a panic shared
125/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
126/// poisoning mutexes or atomics to mutate shared data or use [`run_catch`] to detect a panic or [`run`]
127/// to propagate a panic.
128///
129/// [unwind safety validation]: std::panic::UnwindSafe
130/// [`Waker`]: std::task::Waker
131/// [`rayon`]: https://docs.rs/rayon
132/// [`LocalContext`]: zng_app_context::LocalContext
133/// [`response_var`]: zng_var::response_var
134pub fn spawn<F>(task: impl IntoFuture<IntoFuture = F>)
135where
136 F: Future<Output = ()> + Send + 'static,
137{
138 Arc::new(RayonTask {
139 ctx: LocalContext::capture(),
140 fut: Mutex::new(Some(Box::pin(task.into_future()))),
141 })
142 .poll()
143}
144
145/// Polls the `task` once immediately on the calling thread, if the `task` is pending, continues execution in [`spawn`].
146pub fn poll_spawn<F>(task: impl IntoFuture<IntoFuture = F>)
147where
148 F: Future<Output = ()> + Send + 'static,
149{
150 struct PollRayonTask {
151 fut: Mutex<Option<(RayonSpawnFut, Option<LocalContext>)>>,
152 }
153 impl PollRayonTask {
154 // start task in calling thread
155 fn poll(self: Arc<Self>) {
156 let mut task = self.fut.lock();
157 let (mut t, _) = task.take().unwrap();
158
159 let waker = self.clone().into();
160
161 match t.as_mut().poll(&mut std::task::Context::from_waker(&waker)) {
162 Poll::Ready(()) => {}
163 Poll::Pending => {
164 let ctx = LocalContext::capture();
165 *task = Some((t, Some(ctx)));
166 }
167 }
168 }
169 }
170 impl std::task::Wake for PollRayonTask {
171 fn wake(self: Arc<Self>) {
172 // continue task in spawn threads
173 if let Some((task, Some(ctx))) = self.fut.lock().take() {
174 Arc::new(RayonTask {
175 ctx,
176 fut: Mutex::new(Some(Box::pin(task))),
177 })
178 .poll();
179 }
180 }
181 }
182
183 Arc::new(PollRayonTask {
184 fut: Mutex::new(Some((Box::pin(task.into_future()), None))),
185 })
186 .poll()
187}
188
189type RayonSpawnFut = Pin<Box<dyn Future<Output = ()> + Send>>;
190
191// A future that is its own waker that polls inside rayon spawn tasks.
192struct RayonTask {
193 ctx: LocalContext,
194 fut: Mutex<Option<RayonSpawnFut>>,
195}
196impl RayonTask {
197 fn poll(self: Arc<Self>) {
198 rayon::spawn(move || {
199 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
200 let mut task = self.fut.lock();
201 if let Some(mut t) = task.take() {
202 let waker = self.clone().into();
203
204 // load app context
205 self.ctx.clone().with_context(move || {
206 let r = panic::catch_unwind(panic::AssertUnwindSafe(move || {
207 // poll future
208 if t.as_mut().poll(&mut std::task::Context::from_waker(&waker)).is_pending() {
209 // not done
210 *task = Some(t);
211 }
212 }));
213 if let Err(p) = r {
214 tracing::error!("panic in `task::spawn`: {}", crate_util::panic_str(&p));
215 }
216 });
217 }
218 })
219 }
220}
221impl std::task::Wake for RayonTask {
222 fn wake(self: Arc<Self>) {
223 self.poll()
224 }
225}
226
227/// Rayon join with local context.
228///
229/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
230/// operations.
231///
232/// See `rayon::join` for more details about join.
233///
234/// [`LocalContext`]: zng_app_context::LocalContext
235pub fn join<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
236where
237 A: FnOnce() -> RA + Send,
238 B: FnOnce() -> RB + Send,
239 RA: Send,
240 RB: Send,
241{
242 self::join_context(move |_| op_a(), move |_| op_b())
243}
244
245/// Rayon join context with local context.
246///
247/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
248/// operations.
249///
250/// See `rayon::join_context` for more details about join.
251///
252/// [`LocalContext`]: zng_app_context::LocalContext
253pub fn join_context<A, B, RA, RB>(op_a: A, op_b: B) -> (RA, RB)
254where
255 A: FnOnce(rayon::FnContext) -> RA + Send,
256 B: FnOnce(rayon::FnContext) -> RB + Send,
257 RA: Send,
258 RB: Send,
259{
260 let ctx = LocalContext::capture();
261 let ctx = &ctx;
262 rayon::join_context(
263 move |a| {
264 if a.migrated() {
265 ctx.clone().with_context(|| op_a(a))
266 } else {
267 op_a(a)
268 }
269 },
270 move |b| {
271 if b.migrated() {
272 ctx.clone().with_context(|| op_b(b))
273 } else {
274 op_b(b)
275 }
276 },
277 )
278}
279
280/// Rayon scope with local context.
281///
282/// This function captures the [`LocalContext`] of the calling thread and propagates it to the threads that run the
283/// operations.
284///
285/// See `rayon::scope` for more details about scope.
286///
287/// [`LocalContext`]: zng_app_context::LocalContext
288pub fn scope<'scope, OP, R>(op: OP) -> R
289where
290 OP: FnOnce(ScopeCtx<'_, 'scope>) -> R + Send,
291 R: Send,
292{
293 let ctx = LocalContext::capture();
294
295 // Cast `&'_ ctx` to `&'scope ctx` to "inject" the context in the scope.
296 // Is there a better way to do this? I hope so.
297 //
298 // SAFETY:
299 // * We are extending `'_` to `'scope`, that is one of the documented valid usages of `transmute`.
300 // * No use after free because `rayon::scope` joins all threads before returning and we only drop `ctx` after.
301 let ctx_ref: &'_ LocalContext = &ctx;
302 let ctx_scope_ref: &'scope LocalContext = unsafe { std::mem::transmute(ctx_ref) };
303
304 let r = rayon::scope(move |s| {
305 op(ScopeCtx {
306 scope: s,
307 ctx: ctx_scope_ref,
308 })
309 });
310
311 drop(ctx);
312
313 r
314}
315
316/// Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller's thread context.
317///
318/// See [`scope`] for more details.
319#[derive(Clone, Copy, Debug)]
320pub struct ScopeCtx<'a, 'scope: 'a> {
321 scope: &'a rayon::Scope<'scope>,
322 ctx: &'scope LocalContext,
323}
324impl<'a, 'scope: 'a> ScopeCtx<'a, 'scope> {
325 /// Spawns a job into the fork-join scope `self`. The job runs in the captured thread context.
326 ///
327 /// See `rayon::Scope::spawn` for more details.
328 pub fn spawn<F>(self, f: F)
329 where
330 F: FnOnce(ScopeCtx<'_, 'scope>) + Send + 'scope,
331 {
332 let ctx = self.ctx;
333 self.scope
334 .spawn(move |s| ctx.clone().with_context(move || f(ScopeCtx { scope: s, ctx })));
335 }
336}
337
338/// Spawn a parallel async task that can also be `.await` for the task result.
339///
340/// # Parallel
341///
342/// The task runs in the primary [`rayon`] thread-pool, every [`poll`](Future::poll) happens inside a call to `rayon::spawn`.
343///
344/// You can use parallel iterators, `join` or any of rayon's utilities inside `task` to make it multi-threaded,
345/// otherwise it will run in a single thread at a time, still not blocking the UI.
346///
347/// The [`rayon`] crate is re-exported in `task::rayon` for convenience and compatibility.
348///
349/// # Async
350///
351/// The `task` is also a future so you can `.await`, after each `.await` the task continues executing in whatever `rayon` thread
352/// is free, so the `task` should either be doing CPU intensive work or awaiting, blocking IO operations
353/// block the thread from being used by other tasks reducing overall performance. You can use [`wait`] for IO
354/// or blocking operations and for networking you can use any of the async crates, as long as they start their own *event reactor*.
355///
356/// The `task` lives inside the [`Waker`] when awaiting and inside `rayon::spawn` when running.
357///
358/// # Examples
359///
360/// ```
361/// # use zng_task::{self as task, rayon::iter::*};
362/// # struct SomeStruct { sum: usize }
363/// # async fn read_numbers() -> Vec<usize> { vec![] }
364/// # impl SomeStruct {
365/// async fn on_event(&mut self) {
366/// self.sum = task::run(async {
367/// read_numbers().await.par_iter().map(|i| i * i).sum()
368/// }).await;
369/// }
370/// # }
371/// ```
372///
373/// The example `.await` for some numbers and then uses a parallel iterator to compute a result, this all runs in parallel
374/// because it is inside a `run` task. The task result is then `.await` inside one of the UI async tasks. Note that the
375/// task captures the caller [`LocalContext`] so you can interact with variables and UI services directly inside the task too.
376///
377/// # Cancellation
378///
379/// The task starts running immediately, awaiting the returned future merely awaits for a message from the worker threads and
380/// that means the `task` future is not owned by the returned future. Usually to *cancel* a future you only need to drop it,
381/// in this task dropping the returned future will only drop the `task` once it reaches a `.await` point and detects that the
382/// result channel is disconnected.
383///
384/// If you want to deterministically known that the `task` was cancelled use a cancellation signal.
385///
386/// # Panic Propagation
387///
388/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
389/// can use [`run_catch`] to get the panic as an error instead.
390///
391/// [`resume_unwind`]: panic::resume_unwind
392/// [`Waker`]: std::task::Waker
393/// [`rayon`]: https://docs.rs/rayon
394/// [`LocalContext`]: zng_app_context::LocalContext
395pub async fn run<R, T>(task: impl IntoFuture<IntoFuture = T>) -> R
396where
397 R: Send + 'static,
398 T: Future<Output = R> + Send + 'static,
399{
400 match run_catch(task).await {
401 Ok(r) => r,
402 Err(p) => panic::resume_unwind(p),
403 }
404}
405
406/// Like [`run`] but catches panics.
407///
408/// This task works the same and has the same utility as [`run`], except if returns panic messages
409/// as an error instead of propagating the panic.
410///
411/// # Unwind Safety
412///
413/// This function disables the [unwind safety validation], meaning that in case of a panic shared
414/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
415/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
416/// if this function returns an error.
417///
418/// [unwind safety validation]: std::panic::UnwindSafe
419pub async fn run_catch<R, T>(task: impl IntoFuture<IntoFuture = T>) -> PanicResult<R>
420where
421 R: Send + 'static,
422 T: Future<Output = R> + Send + 'static,
423{
424 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
425
426 // A future that is its own waker that polls inside the rayon primary thread-pool.
427 struct RayonCatchTask<R> {
428 ctx: LocalContext,
429 fut: Mutex<Option<Fut<R>>>,
430 sender: flume::Sender<PanicResult<R>>,
431 }
432 impl<R: Send + 'static> RayonCatchTask<R> {
433 fn poll(self: Arc<Self>) {
434 let sender = self.sender.clone();
435 if sender.is_disconnected() {
436 return; // cancel.
437 }
438 rayon::spawn(move || {
439 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
440 let mut task = self.fut.lock();
441 if let Some(mut t) = task.take() {
442 let waker = self.clone().into();
443 let mut cx = std::task::Context::from_waker(&waker);
444
445 self.ctx.clone().with_context(|| {
446 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
447 match r {
448 Ok(Poll::Ready(r)) => {
449 drop(task);
450 let _ = sender.send(Ok(r));
451 }
452 Ok(Poll::Pending) => {
453 *task = Some(t);
454 }
455 Err(p) => {
456 drop(task);
457 let _ = sender.send(Err(p));
458 }
459 }
460 });
461 }
462 })
463 }
464 }
465 impl<R: Send + 'static> std::task::Wake for RayonCatchTask<R> {
466 fn wake(self: Arc<Self>) {
467 self.poll()
468 }
469 }
470
471 let (sender, receiver) = channel::bounded(1);
472
473 Arc::new(RayonCatchTask {
474 ctx: LocalContext::capture(),
475 fut: Mutex::new(Some(Box::pin(task.into_future()))),
476 sender: sender.into(),
477 })
478 .poll();
479
480 receiver.recv().await.unwrap()
481}
482
483/// Spawn a parallel async task that will send its result to a [`ResponseVar<R>`].
484///
485/// The [`run`] documentation explains how `task` is *parallel* and *async*. The `task` starts executing immediately.
486///
487/// # Examples
488///
489/// ```
490/// # use zng_task::{self as task, rayon::iter::*};
491/// # use zng_var::*;
492/// # struct SomeStruct { sum_response: ResponseVar<usize> }
493/// # async fn read_numbers() -> Vec<usize> { vec![] }
494/// # impl SomeStruct {
495/// fn on_event(&mut self) {
496/// self.sum_response = task::respond(async {
497/// read_numbers().await.par_iter().map(|i| i * i).sum()
498/// });
499/// }
500///
501/// fn on_update(&mut self) {
502/// if let Some(result) = self.sum_response.rsp_new() {
503/// println!("sum of squares: {result}");
504/// }
505/// }
506/// # }
507/// ```
508///
509/// The example `.await` for some numbers and then uses a parallel iterator to compute a result. The result is send to
510/// `sum_response` that is a [`ResponseVar<R>`].
511///
512/// # Cancellation
513///
514/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
515///
516/// # Panic Handling
517///
518/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
519///
520/// [`resume_unwind`]: panic::resume_unwind
521/// [`ResponseVar<R>`]: zng_var::ResponseVar
522/// [`response_var`]: zng_var::response_var
523pub fn respond<R, F>(task: F) -> ResponseVar<R>
524where
525 R: VarValue,
526 F: Future<Output = R> + Send + 'static,
527{
528 type Fut<R> = Pin<Box<dyn Future<Output = R> + Send>>;
529
530 let (responder, response) = response_var();
531
532 // A future that is its own waker that polls inside the rayon primary thread-pool.
533 struct RayonRespondTask<R: VarValue> {
534 ctx: LocalContext,
535 fut: Mutex<Option<Fut<R>>>,
536 responder: zng_var::ResponderVar<R>,
537 }
538 impl<R: VarValue> RayonRespondTask<R> {
539 fn poll(self: Arc<Self>) {
540 let responder = self.responder.clone();
541 if responder.strong_count() == 2 {
542 return; // cancel.
543 }
544 rayon::spawn(move || {
545 // this `Option<Fut>` dance is used to avoid a `poll` after `Ready` or panic.
546 let mut task = self.fut.lock();
547 if let Some(mut t) = task.take() {
548 let waker = self.clone().into();
549 let mut cx = std::task::Context::from_waker(&waker);
550
551 self.ctx.clone().with_context(|| {
552 let r = panic::catch_unwind(panic::AssertUnwindSafe(|| t.as_mut().poll(&mut cx)));
553 match r {
554 Ok(Poll::Ready(r)) => {
555 drop(task);
556
557 responder.respond(r);
558 }
559 Ok(Poll::Pending) => {
560 *task = Some(t);
561 }
562 Err(p) => {
563 tracing::error!("panic in `task::respond`: {}", crate_util::panic_str(&p));
564 drop(task);
565 responder.modify(move |_| panic::resume_unwind(p));
566 }
567 }
568 });
569 }
570 })
571 }
572 }
573 impl<R: VarValue> std::task::Wake for RayonRespondTask<R> {
574 fn wake(self: Arc<Self>) {
575 self.poll()
576 }
577 }
578
579 Arc::new(RayonRespondTask {
580 ctx: LocalContext::capture(),
581 fut: Mutex::new(Some(Box::pin(task))),
582 responder,
583 })
584 .poll();
585
586 response
587}
588
589/// Polls the `task` once immediately on the calling thread, if the `task` is ready returns the response already set,
590/// if the `task` is pending continues execution like [`respond`].
591pub fn poll_respond<R, F>(task: impl IntoFuture<IntoFuture = F>) -> ResponseVar<R>
592where
593 R: VarValue,
594 F: Future<Output = R> + Send + 'static,
595{
596 enum QuickResponse<R: VarValue> {
597 Quick(Option<R>),
598 Response(zng_var::ResponderVar<R>),
599 }
600 let task = task.into_future();
601 let q = Arc::new(Mutex::new(QuickResponse::Quick(None)));
602 poll_spawn(zng_clone_move::async_clmv!(q, {
603 let rsp = task.await;
604
605 match &mut *q.lock() {
606 QuickResponse::Quick(q) => *q = Some(rsp),
607 QuickResponse::Response(r) => r.respond(rsp),
608 }
609 }));
610
611 let mut q = q.lock();
612 match &mut *q {
613 QuickResponse::Quick(q) if q.is_some() => response_done_var(q.take().unwrap()),
614 _ => {
615 let (responder, response) = response_var();
616 *q = QuickResponse::Response(responder);
617 response
618 }
619 }
620}
621
622/// Create a parallel `task` that blocks awaiting for an IO operation, the `task` starts on the first `.await`.
623///
624/// # Parallel
625///
626/// The `task` runs in the [`blocking`] thread-pool which is optimized for awaiting blocking operations.
627/// If the `task` is computation heavy you should use [`run`] and then `wait` inside that task for the
628/// parts that are blocking.
629///
630/// # Examples
631///
632/// ```
633/// # fn main() { }
634/// # use zng_task as task;
635/// # async fn example() {
636/// task::wait(|| std::fs::read_to_string("file.txt")).await
637/// # ; }
638/// ```
639///
640/// The example reads a file, that is a blocking file IO operation, most of the time is spend waiting for the operating system,
641/// so we offload this to a `wait` task. The task can be `.await` inside a [`run`] task or inside one of the UI tasks
642/// like in a async event handler.
643///
644/// # Async Read/Write
645///
646/// For [`std::io::Read`] and [`std::io::Write`] operations you can also use [`io`] and [`fs`] alternatives when you don't
647/// have or want the full file in memory or when you want to apply multiple operations to the file.
648///
649/// # Panic Propagation
650///
651/// If the `task` panics the panic is resumed in the awaiting thread using [`resume_unwind`]. You
652/// can use [`wait_catch`] to get the panic as an error instead.
653///
654/// [`blocking`]: https://docs.rs/blocking
655/// [`resume_unwind`]: panic::resume_unwind
656pub async fn wait<T, F>(task: F) -> T
657where
658 F: FnOnce() -> T + Send + 'static,
659 T: Send + 'static,
660{
661 match wait_catch(task).await {
662 Ok(r) => r,
663 Err(p) => panic::resume_unwind(p),
664 }
665}
666
667/// Like [`wait`] but catches panics.
668///
669/// This task works the same and has the same utility as [`wait`], except if returns panic messages
670/// as an error instead of propagating the panic.
671///
672/// # Unwind Safety
673///
674/// This function disables the [unwind safety validation], meaning that in case of a panic shared
675/// data can end-up in an invalid, but still memory safe, state. If you are worried about that only use
676/// poisoning mutexes or atomics to mutate shared data or discard all shared data used in the `task`
677/// if this function returns an error.
678///
679/// [unwind safety validation]: std::panic::UnwindSafe
680pub async fn wait_catch<T, F>(task: F) -> PanicResult<T>
681where
682 F: FnOnce() -> T + Send + 'static,
683 T: Send + 'static,
684{
685 let mut ctx = LocalContext::capture();
686 blocking::unblock(move || ctx.with_context(move || panic::catch_unwind(panic::AssertUnwindSafe(task)))).await
687}
688
689/// Fire and forget a [`wait`] task. The `task` starts executing immediately.
690///
691/// # Panic Handling
692///
693/// If the `task` panics the panic message is logged as an error, the panic is otherwise ignored.
694///
695/// # Unwind Safety
696///
697/// This function disables the [unwind safety validation], meaning that in case of a panic shared
698/// data can end-up in an invalid (still memory safe) state. If you are worried about that only use
699/// poisoning mutexes or atomics to mutate shared data or use [`wait_catch`] to detect a panic or [`wait`]
700/// to propagate a panic.
701///
702/// [unwind safety validation]: std::panic::UnwindSafe
703pub fn spawn_wait<F>(task: F)
704where
705 F: FnOnce() + Send + 'static,
706{
707 spawn(async move {
708 if let Err(p) = wait_catch(task).await {
709 tracing::error!("parallel `spawn_wait` task panicked: {}", crate_util::panic_str(&p))
710 }
711 });
712}
713
714/// Like [`spawn_wait`], but the task will send its result to a [`ResponseVar<R>`].
715///
716/// # Cancellation
717///
718/// Dropping the [`ResponseVar<R>`] does not cancel the `task`, it will still run to completion.
719///
720/// # Panic Handling
721///
722/// If the `task` panics the panic is logged as an error and resumed in the response var modify closure.
723pub fn wait_respond<R, F>(task: F) -> ResponseVar<R>
724where
725 R: VarValue,
726 F: FnOnce() -> R + Send + 'static,
727{
728 let (responder, response) = response_var();
729 spawn_wait(move || match panic::catch_unwind(panic::AssertUnwindSafe(task)) {
730 Ok(r) => responder.respond(r),
731 Err(p) => {
732 tracing::error!("panic in `task::wait_respond`: {}", crate_util::panic_str(&p));
733 responder.modify(move |_| panic::resume_unwind(p))
734 }
735 });
736 response
737}
738
739/// Blocks the thread until the `task` future finishes.
740///
741/// This function is useful for implementing async tests, using it in an app will probably cause
742/// the app to stop responding.
743///
744/// The crate [`futures-lite`] is used to execute the task.
745///
746/// # Examples
747///
748/// Test a [`run`] call:
749///
750/// ```
751/// use zng_task as task;
752/// # use zng_unit::*;
753/// # async fn foo(u: u8) -> Result<u8, ()> { task::deadline(1.ms()).await; Ok(u) }
754///
755/// #[test]
756/// # fn __() { }
757/// pub fn run_ok() {
758/// let r = task::block_on(task::run(async {
759/// foo(32).await
760/// }));
761///
762/// # let value =
763/// r.expect("foo(32) was not Ok");
764/// # assert_eq!(32, value);
765/// }
766/// # run_ok();
767/// ```
768///
769/// [`futures-lite`]: https://docs.rs/futures-lite/
770pub fn block_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
771where
772 F: Future,
773{
774 futures_lite::future::block_on(task.into_future())
775}
776
777/// Continuous poll the `task` until if finishes.
778///
779/// This function is useful for implementing some async tests only, futures don't expect to be polled
780/// continuously. This function is only available in test builds.
781#[cfg(any(test, doc, feature = "test_util"))]
782pub fn spin_on<F>(task: impl IntoFuture<IntoFuture = F>) -> F::Output
783where
784 F: Future,
785{
786 use std::pin::pin;
787
788 let mut task = pin!(task.into_future());
789 block_on(future_fn(|cx| match task.as_mut().poll(cx) {
790 Poll::Ready(r) => Poll::Ready(r),
791 Poll::Pending => {
792 cx.waker().wake_by_ref();
793 Poll::Pending
794 }
795 }))
796}
797
798/// Executor used in async doc tests.
799///
800/// If `spin` is `true` the [`spin_on`] executor is used with a timeout of 500 milliseconds.
801/// IF `spin` is `false` the [`block_on`] executor is used with a timeout of 5 seconds.
802#[cfg(any(test, doc, feature = "test_util"))]
803pub fn doc_test<F>(spin: bool, task: impl IntoFuture<IntoFuture = F>) -> F::Output
804where
805 F: Future,
806{
807 use zng_unit::TimeUnits;
808
809 if spin {
810 spin_on(with_deadline(task, 500.ms())).expect("async doc-test timeout")
811 } else {
812 block_on(with_deadline(task, 5.secs())).expect("async doc-test timeout")
813 }
814}
815
816/// A future that is [`Pending`] once and wakes the current task.
817///
818/// After the first `.await` the future is always [`Ready`] and on the first `.await` it calls [`wake`].
819///
820/// [`Pending`]: std::task::Poll::Pending
821/// [`Ready`]: std::task::Poll::Ready
822/// [`wake`]: std::task::Waker::wake
823pub async fn yield_now() {
824 struct YieldNowFut(bool);
825 impl Future for YieldNowFut {
826 type Output = ();
827
828 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
829 if self.0 {
830 Poll::Ready(())
831 } else {
832 self.0 = true;
833 cx.waker().wake_by_ref();
834 Poll::Pending
835 }
836 }
837 }
838
839 YieldNowFut(false).await
840}
841
842/// A future that is [`Pending`] until the `deadline` is reached.
843///
844/// # Examples
845///
846/// Await 5 seconds in a [`spawn`] parallel task:
847///
848/// ```
849/// use zng_task as task;
850/// use zng_unit::*;
851///
852/// task::spawn(async {
853/// println!("waiting 5 seconds..");
854/// task::deadline(5.secs()).await;
855/// println!("5 seconds elapsed.")
856/// });
857/// ```
858///
859/// The future runs on an app provider timer executor, or on the [`futures_timer`] by default.
860///
861/// Note that deadlines from [`Duration`](std::time::Duration) starts *counting* at the moment this function is called,
862/// not at the moment of the first `.await` call.
863///
864/// [`Pending`]: std::task::Poll::Pending
865/// [`futures_timer`]: https://docs.rs/futures-timer
866pub fn deadline(deadline: impl Into<Deadline>) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
867 let deadline = deadline.into();
868 if zng_app_context::LocalContext::current_app().is_some() {
869 DEADLINE_SV.read().0(deadline)
870 } else {
871 default_deadline(deadline)
872 }
873}
874
875app_local! {
876 static DEADLINE_SV: (DeadlineService, bool) = const { (default_deadline, false) };
877}
878
879type DeadlineService = fn(Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
880
881fn default_deadline(deadline: Deadline) -> Pin<Box<dyn Future<Output = ()> + Send + Sync>> {
882 if let Some(timeout) = deadline.time_left() {
883 Box::pin(futures_timer::Delay::new(timeout))
884 } else {
885 Box::pin(std::future::ready(()))
886 }
887}
888
889/// Deadline APP integration.
890#[expect(non_camel_case_types)]
891pub struct DEADLINE_APP;
892
893impl DEADLINE_APP {
894 /// Called by the app implementer to setup the [`deadline`] executor.
895 ///
896 /// If no app calls this the [`futures_timer`] executor is used.
897 ///
898 /// [`futures_timer`]: https://docs.rs/futures-timer
899 ///
900 /// # Panics
901 ///
902 /// Panics if called more than once for the same app.
903 pub fn init_deadline_service(&self, service: DeadlineService) {
904 let (prev, already_set) = mem::replace(&mut *DEADLINE_SV.write(), (service, true));
905 if already_set {
906 *DEADLINE_SV.write() = (prev, true);
907 panic!("deadline service already inited for this app");
908 }
909 }
910}
911
912/// Implements a [`Future`] from a closure.
913///
914/// # Examples
915///
916/// A future that is ready with a closure returns `Some(R)`.
917///
918/// ```
919/// use zng_task as task;
920/// use std::task::Poll;
921///
922/// async fn ready_some<R>(mut closure: impl FnMut() -> Option<R>) -> R {
923/// task::future_fn(|cx| {
924/// match closure() {
925/// Some(r) => Poll::Ready(r),
926/// None => Poll::Pending
927/// }
928/// }).await
929/// }
930/// ```
931pub async fn future_fn<T, F>(fn_: F) -> T
932where
933 F: FnMut(&mut std::task::Context) -> Poll<T>,
934{
935 struct PollFn<F>(F);
936 impl<F> Unpin for PollFn<F> {}
937 impl<T, F: FnMut(&mut std::task::Context<'_>) -> Poll<T>> Future for PollFn<F> {
938 type Output = T;
939
940 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
941 (self.0)(cx)
942 }
943 }
944 PollFn(fn_).await
945}
946
947/// Error when [`with_deadline`] reach a time limit before a task finishes.
948#[derive(Debug, Clone, Copy)]
949pub struct DeadlineError {}
950impl fmt::Display for DeadlineError {
951 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952 write!(f, "reached deadline")
953 }
954}
955impl std::error::Error for DeadlineError {}
956
957/// Add a [`deadline`] to a future.
958///
959/// Returns the `fut` output or [`DeadlineError`] if the deadline elapses first.
960pub async fn with_deadline<O, F: Future<Output = O>>(
961 fut: impl IntoFuture<IntoFuture = F>,
962 deadline: impl Into<Deadline>,
963) -> Result<F::Output, DeadlineError> {
964 let deadline = deadline.into();
965 any!(async { Ok(fut.await) }, async {
966 self::deadline(deadline).await;
967 Err(DeadlineError {})
968 })
969 .await
970}
971
972/// <span data-del-macro-root></span> A future that *zips* other futures.
973///
974/// The macro input is a comma separated list of future expressions. The macro output is a future
975/// that when ".awaited" produces a tuple of results in the same order as the inputs.
976///
977/// At least one input future is required and any number of futures is accepted. For more than
978/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
979/// some IDEs.
980///
981/// Each input must implement [`IntoFuture`]. Note that each input must be known at compile time, use the [`fn@all`] async
982/// function to await on all futures in a dynamic list of futures.
983///
984/// # Examples
985///
986/// Await for three different futures to complete:
987///
988/// ```
989/// use zng_task as task;
990///
991/// # task::doc_test(false, async {
992/// let (a, b, c) = task::all!(
993/// task::run(async { 'a' }),
994/// task::wait(|| "b"),
995/// async { b"c" }
996/// ).await;
997/// # });
998/// ```
999#[macro_export]
1000macro_rules! all {
1001 ($fut0:expr $(,)?) => { $crate::__all! { fut0: $fut0; } };
1002 ($fut0:expr, $fut1:expr $(,)?) => {
1003 $crate::__all! {
1004 fut0: $fut0;
1005 fut1: $fut1;
1006 }
1007 };
1008 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1009 $crate::__all! {
1010 fut0: $fut0;
1011 fut1: $fut1;
1012 fut2: $fut2;
1013 }
1014 };
1015 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1016 $crate::__all! {
1017 fut0: $fut0;
1018 fut1: $fut1;
1019 fut2: $fut2;
1020 fut3: $fut3;
1021 }
1022 };
1023 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1024 $crate::__all! {
1025 fut0: $fut0;
1026 fut1: $fut1;
1027 fut2: $fut2;
1028 fut3: $fut3;
1029 fut4: $fut4;
1030 }
1031 };
1032 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1033 $crate::__all! {
1034 fut0: $fut0;
1035 fut1: $fut1;
1036 fut2: $fut2;
1037 fut3: $fut3;
1038 fut4: $fut4;
1039 fut5: $fut5;
1040 }
1041 };
1042 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1043 $crate::__all! {
1044 fut0: $fut0;
1045 fut1: $fut1;
1046 fut2: $fut2;
1047 fut3: $fut3;
1048 fut4: $fut4;
1049 fut5: $fut5;
1050 fut6: $fut6;
1051 }
1052 };
1053 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1054 $crate::__all! {
1055 fut0: $fut0;
1056 fut1: $fut1;
1057 fut2: $fut2;
1058 fut3: $fut3;
1059 fut4: $fut4;
1060 fut5: $fut5;
1061 fut6: $fut6;
1062 fut7: $fut7;
1063 }
1064 };
1065 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all; $($fut),+ } }
1066}
1067
1068#[doc(hidden)]
1069#[macro_export]
1070macro_rules! __all {
1071 ($($ident:ident: $fut:expr;)+) => {
1072 {
1073 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1074 $crate::future_fn(move |cx| {
1075 use std::task::Poll;
1076
1077 let mut pending = false;
1078
1079 $(
1080 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1081 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1082 // Future::poll call, so it will not move.
1083 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1084 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1085 $ident = $crate::FutureOrOutput::Output(r);
1086 } else {
1087 pending = true;
1088 }
1089 }
1090 )+
1091
1092 if pending {
1093 Poll::Pending
1094 } else {
1095 Poll::Ready(($($ident.take_output()),+))
1096 }
1097 })
1098 }
1099 }
1100}
1101
1102#[doc(hidden)]
1103pub enum FutureOrOutput<F: Future> {
1104 Future(F),
1105 Output(F::Output),
1106 Taken,
1107}
1108impl<F: Future> FutureOrOutput<F> {
1109 pub fn take_output(&mut self) -> F::Output {
1110 match std::mem::replace(self, Self::Taken) {
1111 FutureOrOutput::Output(o) => o,
1112 _ => unreachable!(),
1113 }
1114 }
1115}
1116
1117/// A future that awaits on all `futures` at the same time and returns all results when all futures are ready.
1118///
1119/// This is the dynamic version of [`all!`].
1120pub async fn all<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> Vec<F::Output> {
1121 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1122 future_fn(move |cx| {
1123 let mut pending = false;
1124 for input in &mut futures {
1125 if let FutureOrOutput::Future(fut) = input {
1126 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1127 // Future::poll call, so it will not move.
1128 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1129 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1130 *input = FutureOrOutput::Output(r);
1131 } else {
1132 pending = true;
1133 }
1134 }
1135 }
1136
1137 if pending {
1138 Poll::Pending
1139 } else {
1140 Poll::Ready(futures.iter_mut().map(FutureOrOutput::take_output).collect())
1141 }
1142 })
1143 .await
1144}
1145
1146/// <span data-del-macro-root></span> A future that awaits for the first future that is ready.
1147///
1148/// The macro input is comma separated list of future expressions, the futures must
1149/// all have the same output type. The macro output is a future that when ".awaited" produces
1150/// a single output type instance returned by the first input future that completes.
1151///
1152/// At least one input future is required and any number of futures is accepted. For more than
1153/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1154/// some IDEs.
1155///
1156/// If two futures are ready at the same time the result of the first future in the input list is used.
1157/// After one future is ready the other futures are not polled again and are dropped.
1158///
1159/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1160/// known at compile time, use the [`fn@any`] async function to await on all futures in a dynamic list of futures.
1161///
1162/// # Examples
1163///
1164/// Await for the first of three futures to complete:
1165///
1166/// ```
1167/// use zng_task as task;
1168/// use zng_unit::*;
1169///
1170/// # task::doc_test(false, async {
1171/// let r = task::any!(
1172/// task::run(async { task::deadline(300.ms()).await; 'a' }),
1173/// task::wait(|| 'b'),
1174/// async { task::deadline(300.ms()).await; 'c' }
1175/// ).await;
1176///
1177/// assert_eq!('b', r);
1178/// # });
1179/// ```
1180#[macro_export]
1181macro_rules! any {
1182 ($fut0:expr $(,)?) => { $crate::__any! { fut0: $fut0; } };
1183 ($fut0:expr, $fut1:expr $(,)?) => {
1184 $crate::__any! {
1185 fut0: $fut0;
1186 fut1: $fut1;
1187 }
1188 };
1189 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1190 $crate::__any! {
1191 fut0: $fut0;
1192 fut1: $fut1;
1193 fut2: $fut2;
1194 }
1195 };
1196 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1197 $crate::__any! {
1198 fut0: $fut0;
1199 fut1: $fut1;
1200 fut2: $fut2;
1201 fut3: $fut3;
1202 }
1203 };
1204 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1205 $crate::__any! {
1206 fut0: $fut0;
1207 fut1: $fut1;
1208 fut2: $fut2;
1209 fut3: $fut3;
1210 fut4: $fut4;
1211 }
1212 };
1213 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1214 $crate::__any! {
1215 fut0: $fut0;
1216 fut1: $fut1;
1217 fut2: $fut2;
1218 fut3: $fut3;
1219 fut4: $fut4;
1220 fut5: $fut5;
1221 }
1222 };
1223 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1224 $crate::__any! {
1225 fut0: $fut0;
1226 fut1: $fut1;
1227 fut2: $fut2;
1228 fut3: $fut3;
1229 fut4: $fut4;
1230 fut5: $fut5;
1231 fut6: $fut6;
1232 }
1233 };
1234 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1235 $crate::__any! {
1236 fut0: $fut0;
1237 fut1: $fut1;
1238 fut2: $fut2;
1239 fut3: $fut3;
1240 fut4: $fut4;
1241 fut5: $fut5;
1242 fut6: $fut6;
1243 fut7: $fut7;
1244 }
1245 };
1246 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any; $($fut),+ } }
1247}
1248#[doc(hidden)]
1249#[macro_export]
1250macro_rules! __any {
1251 ($($ident:ident: $fut:expr;)+) => {
1252 {
1253 $(let mut $ident = std::future::IntoFuture::into_future($fut);)+
1254 $crate::future_fn(move |cx| {
1255 use std::task::Poll;
1256 $(
1257 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1258 // Future::poll call, so it will not move.
1259 let mut $ident = unsafe { std::pin::Pin::new_unchecked(&mut $ident) };
1260 if let Poll::Ready(r) = $ident.as_mut().poll(cx) {
1261 return Poll::Ready(r)
1262 }
1263 )+
1264
1265 Poll::Pending
1266 })
1267 }
1268 }
1269}
1270#[doc(hidden)]
1271pub use zng_task_proc_macros::task_any_all as __proc_any_all;
1272
1273/// A future that awaits on all `futures` at the same time and returns the first result when the first future is ready.
1274///
1275/// This is the dynamic version of [`any!`].
1276pub async fn any<F: IntoFuture>(futures: impl IntoIterator<Item = F>) -> F::Output {
1277 let mut futures: Vec<_> = futures.into_iter().map(IntoFuture::into_future).collect();
1278 future_fn(move |cx| {
1279 for fut in &mut futures {
1280 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1281 // Future::poll call, so it will not move.
1282 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1283 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1284 return Poll::Ready(r);
1285 }
1286 }
1287 Poll::Pending
1288 })
1289 .await
1290}
1291
1292/// <span data-del-macro-root></span> A future that waits for the first future that is ready with an `Ok(T)` result.
1293///
1294/// The macro input is comma separated list of future expressions, the futures must
1295/// all have the same output `Result<T, E>` type, but each can have a different `E`. The macro output is a future
1296/// that when ".awaited" produces a single output of type `Result<T, (E0, E1, ..)>` that is `Ok(T)` if any of the futures
1297/// is `Ok(T)` or is `Err((E0, E1, ..))` is all futures are `Err`.
1298///
1299/// At least one input future is required and any number of futures is accepted. For more than
1300/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1301/// some IDEs.
1302///
1303/// If two futures are ready and `Ok(T)` at the same time the result of the first future in the input list is used.
1304/// After one future is ready and `Ok(T)` the other futures are not polled again and are dropped. After a future
1305/// is ready and `Err(E)` it is also not polled again and dropped.
1306///
1307/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1308/// known at compile time, use the [`fn@any_ok`] async function to await on all futures in a dynamic list of futures.
1309///
1310/// # Examples
1311///
1312/// Await for the first of three futures to complete with `Ok`:
1313///
1314/// ```
1315/// use zng_task as task;
1316/// # #[derive(Debug, PartialEq)]
1317/// # pub struct FooError;
1318/// # task::doc_test(false, async {
1319/// let r = task::any_ok!(
1320/// task::run(async { Err::<char, _>("error") }),
1321/// task::wait(|| Ok::<_, FooError>('b')),
1322/// async { Err::<char, _>(FooError) }
1323/// ).await;
1324///
1325/// assert_eq!(Ok('b'), r);
1326/// # });
1327/// ```
1328#[macro_export]
1329macro_rules! any_ok {
1330 ($fut0:expr $(,)?) => { $crate::__any_ok! { fut0: $fut0; } };
1331 ($fut0:expr, $fut1:expr $(,)?) => {
1332 $crate::__any_ok! {
1333 fut0: $fut0;
1334 fut1: $fut1;
1335 }
1336 };
1337 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1338 $crate::__any_ok! {
1339 fut0: $fut0;
1340 fut1: $fut1;
1341 fut2: $fut2;
1342 }
1343 };
1344 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1345 $crate::__any_ok! {
1346 fut0: $fut0;
1347 fut1: $fut1;
1348 fut2: $fut2;
1349 fut3: $fut3;
1350 }
1351 };
1352 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1353 $crate::__any_ok! {
1354 fut0: $fut0;
1355 fut1: $fut1;
1356 fut2: $fut2;
1357 fut3: $fut3;
1358 fut4: $fut4;
1359 }
1360 };
1361 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1362 $crate::__any_ok! {
1363 fut0: $fut0;
1364 fut1: $fut1;
1365 fut2: $fut2;
1366 fut3: $fut3;
1367 fut4: $fut4;
1368 fut5: $fut5;
1369 }
1370 };
1371 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1372 $crate::__any_ok! {
1373 fut0: $fut0;
1374 fut1: $fut1;
1375 fut2: $fut2;
1376 fut3: $fut3;
1377 fut4: $fut4;
1378 fut5: $fut5;
1379 fut6: $fut6;
1380 }
1381 };
1382 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1383 $crate::__any_ok! {
1384 fut0: $fut0;
1385 fut1: $fut1;
1386 fut2: $fut2;
1387 fut3: $fut3;
1388 fut4: $fut4;
1389 fut5: $fut5;
1390 fut6: $fut6;
1391 fut7: $fut7;
1392 }
1393 };
1394 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_ok; $($fut),+ } }
1395}
1396
1397#[doc(hidden)]
1398#[macro_export]
1399macro_rules! __any_ok {
1400 ($($ident:ident: $fut: expr;)+) => {
1401 {
1402 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1403 $crate::future_fn(move |cx| {
1404 use std::task::Poll;
1405
1406 let mut pending = false;
1407
1408 $(
1409 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1410 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1411 // Future::poll call, so it will not move.
1412 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1413 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1414 match r {
1415 Ok(r) => return Poll::Ready(Ok(r)),
1416 Err(e) => {
1417 $ident = $crate::FutureOrOutput::Output(Err(e));
1418 }
1419 }
1420 } else {
1421 pending = true;
1422 }
1423 }
1424 )+
1425
1426 if pending {
1427 Poll::Pending
1428 } else {
1429 Poll::Ready(Err((
1430 $($ident.take_output().unwrap_err()),+
1431 )))
1432 }
1433 })
1434 }
1435 }
1436}
1437
1438/// A future that awaits on all `futures` at the same time and returns when any future is `Ok(_)` or all are `Err(_)`.
1439///
1440/// This is the dynamic version of [`all_some!`].
1441pub async fn any_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Ok, Vec<Err>> {
1442 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1443 future_fn(move |cx| {
1444 let mut pending = false;
1445 for input in &mut futures {
1446 if let FutureOrOutput::Future(fut) = input {
1447 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1448 // Future::poll call, so it will not move.
1449 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1450 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1451 match r {
1452 Ok(r) => return Poll::Ready(Ok(r)),
1453 Err(e) => *input = FutureOrOutput::Output(Err(e)),
1454 }
1455 } else {
1456 pending = true;
1457 }
1458 }
1459 }
1460
1461 if pending {
1462 Poll::Pending
1463 } else {
1464 Poll::Ready(Err(futures
1465 .iter_mut()
1466 .map(|f| match f.take_output() {
1467 Ok(_) => unreachable!(),
1468 Err(e) => e,
1469 })
1470 .collect()))
1471 }
1472 })
1473 .await
1474}
1475
1476/// <span data-del-macro-root></span> A future that is ready when any of the futures is ready and `Some(T)`.
1477///
1478/// The macro input is comma separated list of future expressions, the futures must
1479/// all have the same output `Option<T>` type. The macro output is a future that when ".awaited" produces
1480/// a single output type instance returned by the first input future that completes with a `Some`.
1481/// If all futures complete with a `None` the output is `None`.
1482///
1483/// At least one input future is required and any number of futures is accepted. For more than
1484/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1485/// some IDEs.
1486///
1487/// If two futures are ready and `Some(T)` at the same time the result of the first future in the input list is used.
1488/// After one future is ready and `Some(T)` the other futures are not polled again and are dropped. After a future
1489/// is ready and `None` it is also not polled again and dropped.
1490///
1491/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1492/// known at compile time, use the [`fn@any_some`] async function to await on all futures in a dynamic list of futures.
1493///
1494/// # Examples
1495///
1496/// Await for the first of three futures to complete with `Some`:
1497///
1498/// ```
1499/// use zng_task as task;
1500/// # task::doc_test(false, async {
1501/// let r = task::any_some!(
1502/// task::run(async { None::<char> }),
1503/// task::wait(|| Some('b')),
1504/// async { None::<char> }
1505/// ).await;
1506///
1507/// assert_eq!(Some('b'), r);
1508/// # });
1509/// ```
1510#[macro_export]
1511macro_rules! any_some {
1512 ($fut0:expr $(,)?) => { $crate::__any_some! { fut0: $fut0; } };
1513 ($fut0:expr, $fut1:expr $(,)?) => {
1514 $crate::__any_some! {
1515 fut0: $fut0;
1516 fut1: $fut1;
1517 }
1518 };
1519 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1520 $crate::__any_some! {
1521 fut0: $fut0;
1522 fut1: $fut1;
1523 fut2: $fut2;
1524 }
1525 };
1526 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1527 $crate::__any_some! {
1528 fut0: $fut0;
1529 fut1: $fut1;
1530 fut2: $fut2;
1531 fut3: $fut3;
1532 }
1533 };
1534 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1535 $crate::__any_some! {
1536 fut0: $fut0;
1537 fut1: $fut1;
1538 fut2: $fut2;
1539 fut3: $fut3;
1540 fut4: $fut4;
1541 }
1542 };
1543 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1544 $crate::__any_some! {
1545 fut0: $fut0;
1546 fut1: $fut1;
1547 fut2: $fut2;
1548 fut3: $fut3;
1549 fut4: $fut4;
1550 fut5: $fut5;
1551 }
1552 };
1553 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1554 $crate::__any_some! {
1555 fut0: $fut0;
1556 fut1: $fut1;
1557 fut2: $fut2;
1558 fut3: $fut3;
1559 fut4: $fut4;
1560 fut5: $fut5;
1561 fut6: $fut6;
1562 }
1563 };
1564 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1565 $crate::__any_some! {
1566 fut0: $fut0;
1567 fut1: $fut1;
1568 fut2: $fut2;
1569 fut3: $fut3;
1570 fut4: $fut4;
1571 fut5: $fut5;
1572 fut6: $fut6;
1573 fut7: $fut7;
1574 }
1575 };
1576 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__any_some; $($fut),+ } }
1577}
1578
1579#[doc(hidden)]
1580#[macro_export]
1581macro_rules! __any_some {
1582 ($($ident:ident: $fut: expr;)+) => {
1583 {
1584 $(let mut $ident = Some(std::future::IntoFuture::into_future($fut));)+
1585 $crate::future_fn(move |cx| {
1586 use std::task::Poll;
1587
1588 let mut pending = false;
1589
1590 $(
1591 if let Some(fut) = $ident.as_mut() {
1592 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1593 // Future::poll call, so it will not move.
1594 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1595 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1596 if let Some(r) = r {
1597 return Poll::Ready(Some(r));
1598 }
1599 $ident = None;
1600 } else {
1601 pending = true;
1602 }
1603 }
1604 )+
1605
1606 if pending {
1607 Poll::Pending
1608 } else {
1609 Poll::Ready(None)
1610 }
1611 })
1612 }
1613 }
1614}
1615
1616/// A future that awaits on all `futures` at the same time and returns when any future is `Some(_)` or all are `None`.
1617///
1618/// This is the dynamic version of [`all_some!`].
1619pub async fn any_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Some> {
1620 let mut futures: Vec<_> = futures.into_iter().map(|f| Some(f.into_future())).collect();
1621 future_fn(move |cx| {
1622 let mut pending = false;
1623 for input in &mut futures {
1624 if let Some(fut) = input {
1625 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1626 // Future::poll call, so it will not move.
1627 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1628 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1629 match r {
1630 Some(r) => return Poll::Ready(Some(r)),
1631 None => *input = None,
1632 }
1633 } else {
1634 pending = true;
1635 }
1636 }
1637 }
1638
1639 if pending { Poll::Pending } else { Poll::Ready(None) }
1640 })
1641 .await
1642}
1643
1644/// <span data-del-macro-root></span> A future that is ready when all futures are ready with an `Ok(T)` result or
1645/// any future is ready with an `Err(E)` result.
1646///
1647/// The output type is `Result<(T0, T1, ..), E>`, the `Ok` type is a tuple with all the `Ok` values, the error
1648/// type is the first error encountered, the input futures must have the same `Err` type but can have different
1649/// `Ok` types.
1650///
1651/// At least one input future is required and any number of futures is accepted. For more than
1652/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1653/// some IDEs.
1654///
1655/// If two futures are ready and `Err(E)` at the same time the result of the first future in the input list is used.
1656/// After one future is ready and `Err(T)` the other futures are not polled again and are dropped. After a future
1657/// is ready it is also not polled again and dropped.
1658///
1659/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1660/// known at compile time, use the [`fn@all_ok`] async function to await on all futures in a dynamic list of futures.
1661///
1662/// # Examples
1663///
1664/// Await for the first of three futures to complete with `Ok(T)`:
1665///
1666/// ```
1667/// use zng_task as task;
1668/// # #[derive(Debug, PartialEq)]
1669/// # struct FooError;
1670/// # task::doc_test(false, async {
1671/// let r = task::all_ok!(
1672/// task::run(async { Ok::<_, FooError>('a') }),
1673/// task::wait(|| Ok::<_, FooError>('b')),
1674/// async { Ok::<_, FooError>('c') }
1675/// ).await;
1676///
1677/// assert_eq!(Ok(('a', 'b', 'c')), r);
1678/// # });
1679/// ```
1680///
1681/// And in if any completes with `Err(E)`:
1682///
1683/// ```
1684/// use zng_task as task;
1685/// # #[derive(Debug, PartialEq)]
1686/// # struct FooError;
1687/// # task::doc_test(false, async {
1688/// let r = task::all_ok!(
1689/// task::run(async { Ok('a') }),
1690/// task::wait(|| Err::<char, _>(FooError)),
1691/// async { Ok('c') }
1692/// ).await;
1693///
1694/// assert_eq!(Err(FooError), r);
1695/// # });
1696/// ```
1697#[macro_export]
1698macro_rules! all_ok {
1699 ($fut0:expr $(,)?) => { $crate::__all_ok! { fut0: $fut0; } };
1700 ($fut0:expr, $fut1:expr $(,)?) => {
1701 $crate::__all_ok! {
1702 fut0: $fut0;
1703 fut1: $fut1;
1704 }
1705 };
1706 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1707 $crate::__all_ok! {
1708 fut0: $fut0;
1709 fut1: $fut1;
1710 fut2: $fut2;
1711 }
1712 };
1713 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1714 $crate::__all_ok! {
1715 fut0: $fut0;
1716 fut1: $fut1;
1717 fut2: $fut2;
1718 fut3: $fut3;
1719 }
1720 };
1721 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1722 $crate::__all_ok! {
1723 fut0: $fut0;
1724 fut1: $fut1;
1725 fut2: $fut2;
1726 fut3: $fut3;
1727 fut4: $fut4;
1728 }
1729 };
1730 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1731 $crate::__all_ok! {
1732 fut0: $fut0;
1733 fut1: $fut1;
1734 fut2: $fut2;
1735 fut3: $fut3;
1736 fut4: $fut4;
1737 fut5: $fut5;
1738 }
1739 };
1740 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1741 $crate::__all_ok! {
1742 fut0: $fut0;
1743 fut1: $fut1;
1744 fut2: $fut2;
1745 fut3: $fut3;
1746 fut4: $fut4;
1747 fut5: $fut5;
1748 fut6: $fut6;
1749 }
1750 };
1751 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1752 $crate::__all_ok! {
1753 fut0: $fut0;
1754 fut1: $fut1;
1755 fut2: $fut2;
1756 fut3: $fut3;
1757 fut4: $fut4;
1758 fut5: $fut5;
1759 fut6: $fut6;
1760 fut7: $fut7;
1761 }
1762 };
1763 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_ok; $($fut),+ } }
1764}
1765
1766#[doc(hidden)]
1767#[macro_export]
1768macro_rules! __all_ok {
1769 ($($ident:ident: $fut: expr;)+) => {
1770 {
1771 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1772 $crate::future_fn(move |cx| {
1773 use std::task::Poll;
1774
1775 let mut pending = false;
1776
1777 $(
1778 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1779 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1780 // Future::poll call, so it will not move.
1781 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1782 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1783 match r {
1784 Ok(r) => {
1785 $ident = $crate::FutureOrOutput::Output(Ok(r))
1786 },
1787 Err(e) => return Poll::Ready(Err(e)),
1788 }
1789 } else {
1790 pending = true;
1791 }
1792 }
1793 )+
1794
1795 if pending {
1796 Poll::Pending
1797 } else {
1798 Poll::Ready(Ok((
1799 $($ident.take_output().unwrap()),+
1800 )))
1801 }
1802 })
1803 }
1804 }
1805}
1806
1807/// A future that awaits on all `futures` at the same time and returns when all futures are `Ok(_)` or any future is `Err(_)`.
1808///
1809/// This is the dynamic version of [`all_ok!`].
1810pub async fn all_ok<Ok, Err, F: IntoFuture<Output = Result<Ok, Err>>>(futures: impl IntoIterator<Item = F>) -> Result<Vec<Ok>, Err> {
1811 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
1812 future_fn(move |cx| {
1813 let mut pending = false;
1814 for input in &mut futures {
1815 if let FutureOrOutput::Future(fut) = input {
1816 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1817 // Future::poll call, so it will not move.
1818 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
1819 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
1820 match r {
1821 Ok(r) => *input = FutureOrOutput::Output(Ok(r)),
1822 Err(e) => return Poll::Ready(Err(e)),
1823 }
1824 } else {
1825 pending = true;
1826 }
1827 }
1828 }
1829
1830 if pending {
1831 Poll::Pending
1832 } else {
1833 Poll::Ready(Ok(futures
1834 .iter_mut()
1835 .map(|f| f.take_output().unwrap_or_else(|_| unreachable!()))
1836 .collect()))
1837 }
1838 })
1839 .await
1840}
1841
1842/// <span data-del-macro-root></span> A future that is ready when all futures are ready with `Some(T)` or when any
1843/// is future ready with `None`.
1844///
1845/// The macro input is comma separated list of future expressions, the futures must
1846/// all have the `Option<T>` output type, but each can have a different `T`. The macro output is a future that when ".awaited"
1847/// produces `Some<(T0, T1, ..)>` if all futures where `Some(T)` or `None` if any of the futures where `None`.
1848///
1849/// At least one input future is required and any number of futures is accepted. For more than
1850/// eight futures a proc-macro is used which may cause code auto-complete to stop working in
1851/// some IDEs.
1852///
1853/// After one future is ready and `None` the other futures are not polled again and are dropped. After a future
1854/// is ready it is also not polled again and dropped.
1855///
1856/// Each input must implement [`IntoFuture`] with the same `Output` type. Note that each input must be
1857/// known at compile time, use the [`fn@all_some`] async function to await on all futures in a dynamic list of futures.
1858///
1859/// # Examples
1860///
1861/// Await for the first of three futures to complete with `Some`:
1862///
1863/// ```
1864/// use zng_task as task;
1865/// # task::doc_test(false, async {
1866/// let r = task::all_some!(
1867/// task::run(async { Some('a') }),
1868/// task::wait(|| Some('b')),
1869/// async { Some('c') }
1870/// ).await;
1871///
1872/// assert_eq!(Some(('a', 'b', 'c')), r);
1873/// # });
1874/// ```
1875///
1876/// Completes with `None` if any future completes with `None`:
1877///
1878/// ```
1879/// # use zng_task as task;
1880/// # task::doc_test(false, async {
1881/// let r = task::all_some!(
1882/// task::run(async { Some('a') }),
1883/// task::wait(|| None::<char>),
1884/// async { Some('b') }
1885/// ).await;
1886///
1887/// assert_eq!(None, r);
1888/// # });
1889/// ```
1890#[macro_export]
1891macro_rules! all_some {
1892 ($fut0:expr $(,)?) => { $crate::__all_some! { fut0: $fut0; } };
1893 ($fut0:expr, $fut1:expr $(,)?) => {
1894 $crate::__all_some! {
1895 fut0: $fut0;
1896 fut1: $fut1;
1897 }
1898 };
1899 ($fut0:expr, $fut1:expr, $fut2:expr $(,)?) => {
1900 $crate::__all_some! {
1901 fut0: $fut0;
1902 fut1: $fut1;
1903 fut2: $fut2;
1904 }
1905 };
1906 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr $(,)?) => {
1907 $crate::__all_some! {
1908 fut0: $fut0;
1909 fut1: $fut1;
1910 fut2: $fut2;
1911 fut3: $fut3;
1912 }
1913 };
1914 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr $(,)?) => {
1915 $crate::__all_some! {
1916 fut0: $fut0;
1917 fut1: $fut1;
1918 fut2: $fut2;
1919 fut3: $fut3;
1920 fut4: $fut4;
1921 }
1922 };
1923 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr $(,)?) => {
1924 $crate::__all_some! {
1925 fut0: $fut0;
1926 fut1: $fut1;
1927 fut2: $fut2;
1928 fut3: $fut3;
1929 fut4: $fut4;
1930 fut5: $fut5;
1931 }
1932 };
1933 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr $(,)?) => {
1934 $crate::__all_some! {
1935 fut0: $fut0;
1936 fut1: $fut1;
1937 fut2: $fut2;
1938 fut3: $fut3;
1939 fut4: $fut4;
1940 fut5: $fut5;
1941 fut6: $fut6;
1942 }
1943 };
1944 ($fut0:expr, $fut1:expr, $fut2:expr, $fut3:expr, $fut4:expr, $fut5:expr, $fut6:expr, $fut7:expr $(,)?) => {
1945 $crate::__all_some! {
1946 fut0: $fut0;
1947 fut1: $fut1;
1948 fut2: $fut2;
1949 fut3: $fut3;
1950 fut4: $fut4;
1951 fut5: $fut5;
1952 fut6: $fut6;
1953 fut7: $fut7;
1954 }
1955 };
1956 ($($fut:expr),+ $(,)?) => { $crate::__proc_any_all!{ $crate::__all_some; $($fut),+ } }
1957}
1958
1959#[doc(hidden)]
1960#[macro_export]
1961macro_rules! __all_some {
1962 ($($ident:ident: $fut: expr;)+) => {
1963 {
1964 $(let mut $ident = $crate::FutureOrOutput::Future(std::future::IntoFuture::into_future($fut));)+
1965 $crate::future_fn(move |cx| {
1966 use std::task::Poll;
1967
1968 let mut pending = false;
1969
1970 $(
1971 if let $crate::FutureOrOutput::Future(fut) = &mut $ident {
1972 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
1973 // Future::poll call, so it will not move.
1974 let mut fut = unsafe { std::pin::Pin::new_unchecked(fut) };
1975 if let Poll::Ready(r) = fut.as_mut().poll(cx) {
1976 if r.is_none() {
1977 return Poll::Ready(None);
1978 }
1979
1980 $ident = $crate::FutureOrOutput::Output(r);
1981 } else {
1982 pending = true;
1983 }
1984 }
1985 )+
1986
1987 if pending {
1988 Poll::Pending
1989 } else {
1990 Poll::Ready(Some((
1991 $($ident.take_output().unwrap()),+
1992 )))
1993 }
1994 })
1995 }
1996 }
1997}
1998
1999/// A future that awaits on all `futures` at the same time and returns when all futures are `Some(_)` or any future is `None`.
2000///
2001/// This is the dynamic version of [`all_some!`].
2002pub async fn all_some<Some, F: IntoFuture<Output = Option<Some>>>(futures: impl IntoIterator<Item = F>) -> Option<Vec<Some>> {
2003 let mut futures: Vec<_> = futures.into_iter().map(|f| FutureOrOutput::Future(f.into_future())).collect();
2004 future_fn(move |cx| {
2005 let mut pending = false;
2006 for input in &mut futures {
2007 if let FutureOrOutput::Future(fut) = input {
2008 // SAFETY: the closure owns $ident and is an exclusive borrow inside a
2009 // Future::poll call, so it will not move.
2010 let mut fut_mut = unsafe { std::pin::Pin::new_unchecked(fut) };
2011 if let Poll::Ready(r) = fut_mut.as_mut().poll(cx) {
2012 match r {
2013 Some(r) => *input = FutureOrOutput::Output(Some(r)),
2014 None => return Poll::Ready(None),
2015 }
2016 } else {
2017 pending = true;
2018 }
2019 }
2020 }
2021
2022 if pending {
2023 Poll::Pending
2024 } else {
2025 Poll::Ready(Some(futures.iter_mut().map(|f| f.take_output().unwrap()).collect()))
2026 }
2027 })
2028 .await
2029}
2030
2031/// A future that will await until [`set`] is called.
2032///
2033/// # Examples
2034///
2035/// Spawns a parallel task that only writes to stdout after the main thread sets the signal:
2036///
2037/// ```
2038/// use zng_task::{self as task, *};
2039/// use zng_clone_move::async_clmv;
2040///
2041/// let signal = SignalOnce::default();
2042///
2043/// task::spawn(async_clmv!(signal, {
2044/// signal.await;
2045/// println!("After Signal!");
2046/// }));
2047///
2048/// signal.set();
2049/// ```
2050///
2051/// [`set`]: SignalOnce::set
2052#[derive(Default, Clone)]
2053pub struct SignalOnce(Arc<SignalInner>);
2054impl fmt::Debug for SignalOnce {
2055 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2056 write!(f, "SignalOnce({})", self.is_set())
2057 }
2058}
2059impl PartialEq for SignalOnce {
2060 fn eq(&self, other: &Self) -> bool {
2061 Arc::ptr_eq(&self.0, &other.0)
2062 }
2063}
2064impl Eq for SignalOnce {}
2065impl Hash for SignalOnce {
2066 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2067 Arc::as_ptr(&self.0).hash(state)
2068 }
2069}
2070impl SignalOnce {
2071 /// New unsigned.
2072 pub fn new() -> Self {
2073 Self::default()
2074 }
2075
2076 /// New signaled.
2077 pub fn new_set() -> Self {
2078 let s = Self::new();
2079 s.set();
2080 s
2081 }
2082
2083 /// If the signal was set.
2084 pub fn is_set(&self) -> bool {
2085 self.0.signaled.load(Ordering::Relaxed)
2086 }
2087
2088 /// Sets the signal and awakes listeners.
2089 pub fn set(&self) {
2090 if !self.0.signaled.swap(true, Ordering::Relaxed) {
2091 let listeners = mem::take(&mut *self.0.listeners.lock());
2092 for listener in listeners {
2093 listener.wake();
2094 }
2095 }
2096 }
2097}
2098impl Future for SignalOnce {
2099 type Output = ();
2100
2101 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<()> {
2102 if self.as_ref().is_set() {
2103 Poll::Ready(())
2104 } else {
2105 let mut listeners = self.0.listeners.lock();
2106 let waker = cx.waker();
2107 if !listeners.iter().any(|w| w.will_wake(waker)) {
2108 listeners.push(waker.clone());
2109 }
2110 Poll::Pending
2111 }
2112 }
2113}
2114
2115#[derive(Default)]
2116struct SignalInner {
2117 signaled: AtomicBool,
2118 listeners: Mutex<Vec<std::task::Waker>>,
2119}
2120
2121/// A [`Waker`] that dispatches a wake call to multiple other wakers.
2122///
2123/// This is useful for sharing one wake source with multiple [`Waker`] clients that may not be all
2124/// known at the moment the first request is made.
2125///
2126/// [`Waker`]: std::task::Waker
2127#[derive(Clone)]
2128pub struct McWaker(Arc<WakeVec>);
2129
2130#[derive(Default)]
2131struct WakeVec(Mutex<Vec<std::task::Waker>>);
2132impl WakeVec {
2133 fn push(&self, waker: std::task::Waker) -> bool {
2134 let mut v = self.0.lock();
2135
2136 let return_waker = v.is_empty();
2137
2138 v.push(waker);
2139
2140 return_waker
2141 }
2142
2143 fn cancel(&self) {
2144 let mut v = self.0.lock();
2145
2146 debug_assert!(!v.is_empty(), "called cancel on an empty McWaker");
2147
2148 v.clear();
2149 }
2150}
2151impl std::task::Wake for WakeVec {
2152 fn wake(self: Arc<Self>) {
2153 for w in mem::take(&mut *self.0.lock()) {
2154 w.wake();
2155 }
2156 }
2157}
2158impl McWaker {
2159 /// New empty waker.
2160 pub fn empty() -> Self {
2161 Self(Arc::new(WakeVec::default()))
2162 }
2163
2164 /// Register a `waker` to wake once when `self` awakes.
2165 ///
2166 /// Returns `Some(self as waker)` if `self` was previously empty, if `None` is returned [`Poll::Pending`] must
2167 /// be returned, if a waker is returned the shared resource must be polled using the waker, if the shared resource
2168 /// is ready [`cancel`] must be called.
2169 ///
2170 /// [`cancel`]: Self::cancel
2171 pub fn push(&self, waker: std::task::Waker) -> Option<std::task::Waker> {
2172 if self.0.push(waker) { Some(self.0.clone().into()) } else { None }
2173 }
2174
2175 /// Clear current registered wakers.
2176 pub fn cancel(&self) {
2177 self.0.cancel()
2178 }
2179}