Module task

Module task 

Source
Expand description

Parallel async tasks and async task runners.

Use run, respond or spawn to run parallel tasks, use wait, io and fs to unblock IO operations and use http for async HTTP.

All functions of this module propagate the LocalContext.

This crate also re-exports the rayon and parking_lot crates for convenience. You can use the ParallelIteratorExt::with_ctx adapter in rayon iterators to propagate the LocalContext. You can also use join to propagate thread context for a raw rayon join operation.

§Examples

use zng::prelude::*;

let enabled = var(false);
Button! {
    on_click = async_hn!(enabled, |_| {
        enabled.set(false);

        let sum_task = task::run(async {
            let numbers = read_numbers().await;
            numbers.par_iter().map(|i| i * i).sum()
        });
        let sum: usize = sum_task.await;
        println!("sum of squares: {sum}");

        enabled.set(true);
    });
    widget::enabled = enabled;
}

async fn read_numbers() -> Vec<usize> {
    let raw = task::wait(|| std::fs::read_to_string("numbers.txt").unwrap()).await;
    raw.par_split(',').map(|s| s.trim().parse::<usize>().unwrap()).collect()
}

The example demonstrates three different tasks, the first is a UiTask in the async_hn handler, this task is async but not parallel, meaning that it will execute in more then one app update, but it will only execute in the on_click context and thread. This is good for coordinating UI state, like setting variables, but is not good if you want to do CPU intensive work.

To keep the app responsive we move the computation work inside a run task, this task is async and parallel, meaning it can .await and will execute in parallel threads. It runs in rayon so you can easily make the task multi-threaded and when it is done it sends the result back to the widget task that is awaiting for it. We resolved the responsiveness problem, but there is one extra problem to solve, how to not block one of the worker threads waiting IO.

We want to keep the run threads either doing work or available for other tasks, but reading a file is just waiting for a potentially slow external operation, so if we call std::fs::read_to_string directly we can potentially remove one of the worker threads from play, reducing the overall tasks performance. To avoid this we move the IO operation inside a wait task, this task is not async but it is parallel, meaning if does not block but it runs a blocking operation. It runs inside a blocking thread-pool, that is optimized for waiting.

§Async IO

You can use wait, io and fs to do async IO, Zng uses this API for internal async IO, they are just a selection of external async crates re-exported for convenience and compatibility.

The io module just re-exports the futures-lite::io traits and types, adding only progress tracking. The fs module is the async-fs crate. Most of the IO async operations are implemented using extensions traits so we recommend blob importing io to start implementing async IO.

use zng::prelude::*;

async fn read_numbers() -> Result<Vec<usize>, Box<dyn std::error::Error + Send + Sync>> {
    let mut file = task::fs::File::open("numbers.txt").await?;
    let mut raw = String::new();
    file.read_to_string(&mut raw).await?;
    raw.par_split(',').map(|s| s.trim().parse::<usize>().map_err(Into::into)).collect()
}

All the std::fs synchronous operations have an async counterpart in fs. For simpler one shot operation it is recommended to just use std::fs inside wait, the async fs types are not async at the OS level, they only offload operations inside the same thread-pool used by wait.

§HTTP Client

You can use http to implement asynchronous HTTP requests. Zng also uses the http module for implementing operations such as loading an image from a given URL, the module is a thin wrapper around the isahc crate.

use zng::prelude::*;

let enabled = var(false);
let msg = var("loading..".to_txt());
Button! {
    on_click = async_hn!(enabled, msg, |_| {
        enabled.set(false);

        match task::http::get_txt("https://httpbin.org/get").await {
            Ok(r) => msg.set(r),
            Err(e) => msg.set(formatx!("error: {e}")),
        }

        enabled.set(true);
    });
}

For other protocols or alternative HTTP clients you can use external crates.

§Async Crates Integration

You can use external async crates to create futures and then .await then in async code managed by Zng, but there is some consideration needed. Async code needs a runtime to execute and some async functions from external crates expect their own runtime to work properly, as a rule of thumb if the crate starts their own event reactor you can just use then without worry.

You can use the futures, async-std and smol crates without worry, they integrate well and even use the same blocking thread-pool that is used in wait. Functions that require an event reactor start it automatically, usually at the cost of one extra thread only. Just .await futures from these crate.

The tokio crate on the other hand, does not integrate well. It does not start its own runtime automatically, and expects you to call its async functions from inside the tokio runtime. After you create a future from inside the runtime you can .await then in any thread, so we recommend manually starting its runtime in a thread and then using the tokio::runtime::Handle to start futures in the runtime.

External tasks also don’t propagate the thread context, if you want access to app services or want to set vars inside external parallel closures you must capture and load the LocalContext manually.

§Full API

This module fully re-exports zng_task.

Re-exports§

pub use zng_task::parking_lot;
pub use zng_task::rayon;

Modules§

channel
Async channels.
fs
Async filesystem primitives.
http
HTTP client.
io
IO tasks.
ipc
IPC tasks.

Macros§

all
A future that zips other futures.
all_ok
A future that is ready when all futures are ready with an Ok(T) result or any future is ready with an Err(E) result.
all_some
A future that is ready when all futures are ready with Some(T) or when any is future ready with None.
any
A future that awaits for the first future that is ready.
any_ok
A future that waits for the first future that is ready with an Ok(T) result.
any_some
A future that is ready when any of the futures is ready and Some(T).

Structs§

DeadlineError
Error when with_deadline reach a time limit before a task finishes.
McWaker
A Waker that dispatches a wake call to multiple other wakers.
ParallelIteratorWithCtx
Parallel iterator adapter the propagates the thread context.
Progress
Status update about a task progress.
ScopeCtx
Represents a fork-join scope which can be used to spawn any number of tasks that run in the caller’s thread context.
SignalOnce
A future that will await until set is called.
TaskPanicError
Panic payload, captured by std::panic::catch_unwind.
UiTask
Represents a Future running in sync with the UI.

Traits§

ParallelIteratorExt
Extends rayon’s ParallelIterator with thread context.
UiTaskWidget
Integrate UiTask with widget updates.

Functions§

all
A future that awaits on all futures at the same time and returns all results when all futures are ready.
all_ok
A future that awaits on all futures at the same time and returns when all futures are Ok(_) or any future is Err(_).
all_some
A future that awaits on all futures at the same time and returns when all futures are Some(_) or any future is None.
any
A future that awaits on all futures at the same time and returns the first result when the first future is ready.
any_ok
A future that awaits on all futures at the same time and returns when any future is Ok(_) or all are Err(_).
any_some
A future that awaits on all futures at the same time and returns when any future is Some(_) or all are None.
block_on
Blocks the thread until the task future finishes.
deadline
A future that is Pending until the deadline is reached.
doc_test
Executor used in async doc tests.
future_fn
Implements a Future from a closure.
join
Rayon join with local context.
join_context
Rayon join context with local context.
poll_respond
Polls the task once immediately on the calling thread, if the task is ready returns the response already set, if the task is pending continues execution like respond.
poll_spawn
Polls the task once immediately on the calling thread, if the task is pending, continues execution in spawn.
respond
Spawn a parallel async task that will send its result to a ResponseVar<R>.
run
Spawn a parallel async task that can also be .await for the task result.
run_catch
Like run but catches panics.
scope
Rayon scope with local context.
set_spawn_panic_handler
Set a handler that is called when spawn tasks panic.
spawn
Spawn a parallel async task, this function is not blocking and the task starts executing immediately.
spawn_wait
Fire and forget a wait task. The task starts executing immediately.
spin_on
Continuous poll the task until if finishes.
wait
Create a parallel task that blocks awaiting for an IO operation, the task starts on the first .await.
wait_catch
Like wait but catches panics.
wait_respond
Like spawn_wait, but the task will send its result to a ResponseVar<R>.
with_deadline
Add a deadline to a future.
yield_now
A future that is Pending once and wakes the current task.