Module ipc

Module ipc 

Source
Expand description

IPC tasks.

This module uses ipc_channel and duct crates to define a worker process that can run tasks in a separate process instance.

Each worker process can run multiple tasks in parallel, the worker type is Worker. Note that this module does not offer a fork implementation, the worker processes begin from the start state. The primary use of process tasks is to make otherwise fatal tasks recoverable, if the task calls unsafe code or code that can potentially terminate the entire process it should run using a Worker. If you only want to recover from panics in safe code consider using task::run_catch or task::wait_catch instead.

This module also re-exports some ipc_channel types and functions. You can send IPC channels in the task request messages, this can be useful for implementing progress reporting or to transfer large byte blobs.

§Examples

The example below demonstrates a worker-process setup that uses the same executable as the app-process.

fn main() {
    zng::env::init!();
    // normal app init..
}

mod task1 {
    use zng::{task::ipc, env};

    const NAME: &str = "zng::example::task1";

    env::on_process_start!(|args| {
        // give tracing handlers a chance to observe the worker-process
        if args.yield_count == 0 { return args.yield_once(); }
        ipc::run_worker(NAME, work);
    });
    async fn work(args: ipc::RequestArgs<Request>) -> Response {
        let rsp = format!("received 'task1' request `{:?}` in worker-process #{}", &args.request.data, std::process::id());
        Response { data: rsp }
    }
     
    #[derive(Debug, serde::Serialize, serde::Deserialize)]
    pub struct Request { pub data: String }

    #[derive(Debug, serde::Serialize, serde::Deserialize)]
    pub struct Response { pub data: String }

    // called in app-process
    pub async fn start() -> ipc::Worker<Request, Response> {
        ipc::Worker::start(NAME).await.expect("cannot spawn 'task1'")
    }
}

// This runs in the app-process, it starts a worker process and requests a task run.
async fn on_click() {
    println!("app-process #{} starting a worker", std::process::id());
    let mut worker = task1::start().await;
    // request a task run and await it.
    match worker.run(task1::Request { data: "request".to_owned() }).await {
        Ok(task1::Response { data }) => println!("ok. {data}"),
        Err(e) => eprintln!("error: {e}"),
    }
    // multiple tasks can be requested in parallel, use `task::all!` to await ..

    // the worker process can be gracefully shutdown, awaits all pending tasks.
    let _ = worker.shutdown().await;
}

Note that you can setup multiple workers the same executable, as long as the on_process_start! call happens on different modules.

§Connect Timeout

If the worker process takes longer than 10 seconds to connect the tasks fails. This is more then enough in most cases, but it can be too little in some test runner machines. You can set the "ZNG_TASK_WORKER_TIMEOUT" environment variable to a custom timeout in seconds. The minimum value is 1 second, set to 0 or empty use the default timeout.

Re-exports§

pub use ipc_channel::ipc::IpcBytesReceiver;
pub use ipc_channel::ipc::IpcBytesSender;
pub use ipc_channel::ipc::IpcReceiver;
pub use ipc_channel::ipc::IpcSender;
pub use ipc_channel::ipc::bytes_channel;

Structs§

RequestArgs
Arguments for run_worker.
Worker
Represents a running worker process.
WorkerCrashError
Info about a worker process crash.

Enums§

RunError
Worker run error.

Constants§

VERSION
The App Process and Worker Process must be build using the same exact version and this is validated during run-time, causing a panic if the versions don’t match.

Traits§

IpcValue
Represents a type that can be an input and output of IPC workers.

Functions§

run_worker
If the process was started by a Worker runs the worker loop and never returns. If not started as worker does nothing.