Module worker

Module worker 

Source
Expand description

Async worker process tasks.

This module defines a worker process that can run tasks in a separate process instance.

Each worker process can run multiple tasks in parallel, the worker type is Worker. Note that this module does not offer a fork implementation, the worker processes begin from the start state. The primary use of process tasks is to make otherwise fatal tasks recoverable, if the task calls unsafe code or code that can potentially terminate the entire process it should run using a Worker. If you only want to recover from panics in safe code consider using task::run_catch or task::wait_catch instead.

You can send IPC channel endpoints in the task request messages, this can be useful for implementing progress reporting, you can also send IpcBytes to efficiently share large byte blobs with the worker process.

§Examples

The example below demonstrates a worker-process setup that uses the same executable as the app-process.

fn main() {
    zng::env::init!();
    // normal app init..
}

mod task1 {
    use zng::{task::process::worker, env};

    const NAME: &str = "zng::example::task1";

    env::on_process_start!(|args| {
        // give tracing handlers a chance to observe the worker-process
        if args.yield_count == 0 { return args.yield_once(); }
        worker::run_worker(NAME, work);
    });
    async fn work(args: worker::RequestArgs<Request>) -> Response {
        let rsp = format!("received 'task1' request `{:?}` in worker-process #{}", &args.request.data, std::process::id());
        Response { data: rsp }
    }
     
    #[derive(Debug, serde::Serialize, serde::Deserialize)]
    pub struct Request { pub data: String }

    #[derive(Debug, serde::Serialize, serde::Deserialize)]
    pub struct Response { pub data: String }

    // called in app-process
    pub async fn start() -> worker::Worker<Request, Response> {
        worker::Worker::start(NAME).await.expect("cannot spawn 'task1'")
    }
}

// This runs in the app-process, it starts a worker process and requests a task run.
async fn on_click() {
    println!("app-process #{} starting a worker", std::process::id());
    let mut worker = task1::start().await;
    // request a task run and await it.
    match worker.run(task1::Request { data: "request".to_owned() }).await {
        Ok(task1::Response { data }) => println!("ok. {data}"),
        Err(e) => eprintln!("error: {e}"),
    }
    // multiple tasks can be requested in parallel, use `task::all!` to await ..

    // the worker process can be gracefully shutdown, awaits all pending tasks.
    let _ = worker.shutdown().await;
}

Note that you can setup multiple workers the same executable, as long as the on_process_start! call happens on different modules.

§Connect Timeout

If the worker process takes longer than 10 seconds to connect the tasks fails. This is more then enough in most cases, but it can be too little in some test runner machines. You can set the "ZNG_TASK_WORKER_TIMEOUT" environment variable to a custom timeout in seconds. The minimum value is 1 second, set to 0 or empty use the default timeout.

Structs§

RequestArgs
Arguments for run_worker.
Worker
Represents a running worker process.
WorkerCrashError
Info about a worker process crash.

Enums§

RunError
Worker run error.

Constants§

VERSION
The App Process and Worker Process must be build using the same exact version and this is validated during run-time, causing a panic if the versions don’t match.

Functions§

run_worker
If the process was started by a Worker runs the worker loop and never returns. If not started as worker does nothing.