Skip to main content

zng/
task.rs

1//! Parallel async tasks and async task runners.
2//!
3//! Use [`run`], [`respond`] or [`spawn`] to run parallel tasks, use [`wait`], [`io`] and [`fs`] to unblock
4//! IO operations and use [`http`] for async HTTP.
5//!
6//! All functions of this module propagate the [`LocalContext`].
7//!
8//! This crate also re-exports the [`rayon`] and [`parking_lot`] crates for convenience. You can use the
9//! [`with_ctx`] adapter in rayon iterators to propagate the [`LocalContext`]. You can
10//! also use [`join`] to propagate thread context for a raw rayon join operation.
11//!
12//! [`with_ctx`]: crate::task::rayon::ParallelIteratorExt::with_ctx
13//!
14//! # Examples
15//!
16//! ```
17//! use zng::prelude::*;
18//!
19//! # fn example() {
20//! let enabled = var(false);
21//! # let _ =
22//! Button! {
23//!     on_click = async_hn!(enabled, |_| {
24//!         enabled.set(false);
25//!
26//!         let sum_task = task::run(async {
27//!             let numbers = read_numbers().await;
28//!             numbers.par_iter().map(|i| i * i).sum()
29//!         });
30//!         let sum: usize = sum_task.await;
31//!         println!("sum of squares: {sum}");
32//!
33//!         enabled.set(true);
34//!     });
35//!     widget::enabled = enabled;
36//! }
37//! # ; }
38//!
39//! async fn read_numbers() -> Vec<usize> {
40//!     let raw = task::wait(|| std::fs::read_to_string("numbers.txt").unwrap()).await;
41//!     raw.par_split(',').map(|s| s.trim().parse::<usize>().unwrap()).collect()
42//! }
43//! ```
44//!
45//! The example demonstrates three different ***tasks***, the first is a [`UiTask`] in the `async_hn` handler,
46//! this task is *async* but not *parallel*, meaning that it will execute in more then one app update, but it will only execute in the
47//! `on_click` context and thread. This is good for coordinating UI state, like setting variables, but is not good if you want to do CPU intensive work.
48//!
49//! To keep the app responsive we move the computation work inside a [`run`] task, this task is *async* and *parallel*,
50//! meaning it can `.await` and will execute in parallel threads. It runs in [`rayon`] so you can
51//! easily make the task multi-threaded and when it is done it sends the result back to the widget task that is awaiting for it. We
52//! resolved the responsiveness problem, but there is one extra problem to solve, how to not block one of the worker threads waiting IO.
53//!
54//! We want to keep the [`run`] threads either doing work or available for other tasks, but reading a file is just waiting
55//! for a potentially slow external operation, so if we call [`std::fs::read_to_string`] directly we can potentially remove one of
56//! the worker threads from play, reducing the overall tasks performance. To avoid this we move the IO operation inside a [`wait`]
57//! task, this task is not *async* but it is *parallel*, meaning if does not block but it runs a blocking operation. It runs inside
58//! a [`blocking`] thread-pool, that is optimized for waiting.
59//!
60//! # Async IO
61//!
62//! You can use [`wait`], [`io`] and [`fs`] to do async IO, Zng uses this API for internal async IO, they are just a selection
63//! of external async crates re-exported for convenience and compatibility.
64//!
65//! The [`io`] module just re-exports the [`futures-lite::io`] traits and types, adding only progress tracking. The
66//! [`fs`] module is the [`async-fs`] crate. Most of the IO async operations are implemented using extensions traits
67//! so we recommend blob importing [`io`] to start implementing async IO.
68//!
69//! ```
70//! use zng::prelude::*;
71//!
72//! async fn read_numbers() -> Result<Vec<usize>, Box<dyn std::error::Error + Send + Sync>> {
73//!     let mut file = task::fs::File::open("numbers.txt").await?;
74//!     let mut raw = String::new();
75//!     file.read_to_string(&mut raw).await?;
76//!     raw.par_split(',').map(|s| s.trim().parse::<usize>().map_err(Into::into)).collect()
77//! }
78//! ```
79//!
80//! All the `std::fs` synchronous operations have an async counterpart in [`fs`]. For simpler one shot
81//! operation it is recommended to just use `std::fs` inside [`wait`], the async [`fs`] types are not async at
82//! the OS level, they only offload operations inside the same thread-pool used by [`wait`].
83//!
84//! # HTTP Client
85//!
86//! You can use [`http`] to implement asynchronous HTTP requests. Zng also uses the [`http`] module for
87//! implementing operations such as loading an image from a given URL, the module defines an HTTP client API
88//!  that is backend agnostic. By default it uses the system `curl` command
89//! line utility with a simple cache, this can be replaced using the full API.
90//!
91//! ```
92//! use zng::prelude::*;
93//!
94//! # fn example() {
95//! let enabled = var(false);
96//! let msg = var("loading..".to_txt());
97//! # let _ =
98//! Button! {
99//!     on_click = async_hn!(enabled, msg, |_| {
100//!         enabled.set(false);
101//!
102//!         match task::http::get_txt("https://httpbin.org/get").await {
103//!             Ok(r) => msg.set(r),
104//!             Err(e) => msg.set(formatx!("error: {e}")),
105//!         }
106//!
107//!         enabled.set(true);
108//!     });
109//! }
110//! # ; }
111//! ```
112//!
113//! For other protocols or alternative HTTP clients you can use [external crates](#async-crates-integration).
114//!
115//! # Async Crates Integration
116//!
117//! You can use external async crates to create futures and then `.await` then in async code managed by Zng, but there is some
118//! consideration needed. Async code needs a runtime to execute and some async functions from external crates expect their own runtime
119//! to work properly, as a rule of thumb if the crate starts their own *event reactor* you can just use then without worry.
120//!
121//! You can use the [`futures`], [`async-std`] and [`smol`] crates without worry, they integrate well and even use the same [`blocking`]
122//! thread-pool that is used in [`wait`]. Functions that require an *event reactor* start it automatically, usually at the cost of one extra
123//! thread only. Just `.await` futures from these crate.
124//!
125//! The [`tokio`] crate on the other hand, does not integrate well. It does not start its own runtime automatically, and expects you
126//! to call its async functions from inside the tokio runtime. After you create a future from inside the runtime you can `.await` then
127//! in any thread, so we recommend manually starting its runtime in a thread and then using the `tokio::runtime::Handle` to start
128//! futures in the runtime.
129//!
130//! External tasks also don't propagate the thread context, if you want access to app services or want to set vars inside external
131//! parallel closures you must capture and load the [`LocalContext`] manually.
132//!
133//! [`LocalContext`]: crate::app::LocalContext
134//! [`blocking`]: https://docs.rs/blocking
135//! [`futures`]: https://docs.rs/futures
136//! [`async-std`]: https://docs.rs/async-std
137//! [`smol`]: https://docs.rs/smol
138//! [`tokio`]: https://docs.rs/tokio
139//! [`futures-lite::io`]: https://docs.rs/futures-lite/*/futures_lite/io/index.html
140//! [`async-fs`]: https://docs.rs/async-fs
141//! [`rayon`]: https://docs.rs/rayon
142//! [`parking_lot`]: https://docs.rs/parking_lot
143//!
144//! # Full API
145//!
146//! See [`zng_task`] for the full API.
147
148pub use zng_task::{
149    DeadlineError, McWaker, Progress, ScopeCtx, SignalOnce, TaskPanicError, UiTask, all, all_ok, all_some, any, any_ok, any_some, block_on,
150    deadline, fs, future_fn, io, join, join_context, poll_respond, poll_spawn, respond, run, run_catch, scope, set_spawn_panic_handler,
151    spawn, spawn_wait, wait, wait_catch, wait_respond, with_deadline, yield_now,
152};
153
154#[cfg(any(doc, feature = "test_util"))]
155pub use zng_task::{doc_test, spin_on};
156
157/// HTTP client.
158///
159/// This module provides an HTTP client API that is backend agnostic. By default it uses the system `curl` command
160/// line utility with a simple cache, this can be replaced using the full API.
161///
162/// # Examples
163///
164/// Get some text:
165///
166/// ```
167/// # use zng::task;
168/// # async fn demo() -> Result<(), zng::task::http::Error> {
169/// let text = task::http::get_txt("https://httpbin.org/base64/SGVsbG8gV29ybGQ=").await?;
170/// println!("{text}!");
171/// # Ok(()) }
172/// ```
173///
174/// # Full API
175///
176/// See [`zng_task::http`] for the full API.
177#[cfg(feature = "http")]
178pub mod http {
179    pub use zng_task::http::{
180        CacheMode, Error, Method, Request, Response, StatusCode, Uri, delete, get, get_bytes, get_json, get_txt, head, header, method,
181        post, put, send, set_request_default, uri,
182    };
183
184    /// Remove all cached entries, or just older ones if `prune` is enabled.
185    pub async fn cache_clean(prune: bool) {
186        if prune {
187            zng_task::http::http_cache().prune().await;
188        } else {
189            zng_task::http::http_cache().purge().await;
190        }
191    }
192}
193
194/// Communication channels.
195///
196/// Use [`bounded`], [`unbounded`] and [`rendezvous`] to create channels for use across threads in the same process.
197/// Use [`ipc_unbounded`] to create channels that work across processes.
198///
199/// # Examples
200///
201/// ```no_run
202/// use zng::prelude::*;
203///
204/// let (sender, receiver) = task::channel::bounded(5);
205///
206/// task::spawn(async move {
207///     task::deadline(5.secs()).await;
208///     if let Err(e) = sender.send("Data!").await {
209///         eprintln!("no receiver connected, did not send message: '{e}'")
210///     }
211/// });
212/// task::spawn(async move {
213///     match receiver.recv().await {
214///         Ok(msg) => println!("{msg}"),
215///         Err(_) => eprintln!("no message in channel and no sender connected"),
216///     }
217/// });
218/// ```
219///
220/// [`bounded`]: crate::task::channel::bounded
221/// [`unbounded`]: crate::task::channel::unbounded
222/// [`rendezvous`]: crate::task::channel::rendezvous
223/// [`ipc_unbounded`]: crate::task::channel::ipc_unbounded
224///
225/// # Full API
226///
227/// See [`zng_task::channel`] for the full API.
228pub mod channel {
229    pub use zng_task::channel::{ChannelError, Receiver, Sender, bounded, rendezvous, unbounded};
230    pub use zng_task::channel::{
231        IpcBytes, IpcBytesCast, IpcBytesCastIntoIter, IpcBytesIntoIter, IpcBytesMut, IpcBytesMutCast, IpcBytesWriter,
232        IpcBytesWriterBlocking, IpcFileHandle, IpcRead, IpcReadBlocking, IpcReadHandle, IpcReceiver, IpcSender, IpcValue, NamedIpcReceiver,
233        NamedIpcSender, WeakIpcBytes, ipc_unbounded,
234    };
235}
236
237#[cfg(not(wasm))]
238pub use zng_task::process;
239
240pub use zng_app::widget::UiTaskWidget;
241
242/// Recommended blocking locks.
243///
244/// The `Mutex` and `RwLock` types reexported here are recommended, they are
245/// more optimized than `std` alternatives because they don't implement lock poisoning,
246/// have `const` initialization and integrate with the `zng` feature `"deadlock_detection"`.
247///
248/// # Full API
249///
250/// See the [`parking_lot`] crate for the full API.
251///
252/// [`parking_lot`]: https://docs.rs/parking_lot
253pub mod parking_lot {
254    use zng_task::parking_lot;
255
256    // no_inline is not imported from zng_task
257    #[doc(no_inline)]
258    pub use parking_lot::{
259        ArcMutexGuard, ArcRwLockReadGuard, ArcRwLockWriteGuard, Condvar, MappedMutexGuard, MappedRwLockReadGuard, MappedRwLockWriteGuard,
260        Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard,
261    };
262}
263
264/// Parallel iterators.
265///
266/// This module mostly reexports the primary traits from the [`rayon`] crate.
267///
268/// This module also includes [`ParallelIteratorWithCtx`] that propagates the
269/// zng app context to rayon tasks.
270///
271/// # Full API
272///
273/// See the [`rayon`] crate for the full API.
274///
275/// [`rayon`]: https://docs.rs/rayon
276/// [`ParallelIteratorWithCtx`]: crate::task::rayon::ParallelIteratorWithCtx
277pub mod rayon {
278    pub use zng_task::rayon::{ParallelIteratorExt, ParallelIteratorWithCtx};
279
280    use zng_task::rayon;
281
282    #[doc(no_inline)]
283    pub use rayon::{iter, slice, str};
284
285    /// Rayon traits imported `as _`.
286    pub mod prelude {
287        use zng_task::rayon;
288
289        #[doc(no_inline)]
290        pub use rayon::{
291            iter::FromParallelIterator as _, iter::IndexedParallelIterator as _, iter::IntoParallelIterator as _,
292            iter::IntoParallelRefIterator as _, iter::IntoParallelRefMutIterator as _, iter::ParallelBridge as _,
293            iter::ParallelDrainFull as _, iter::ParallelDrainRange as _, iter::ParallelExtend as _, iter::ParallelIterator as _,
294            slice::ParallelSlice as _, slice::ParallelSliceMut as _, str::ParallelString as _,
295        };
296
297        pub use zng_task::rayon::ParallelIteratorExt as _;
298    }
299}