Expand description
IPC tasks.
This module uses ipc_channel and duct crates to define a worker process that can run tasks in a separate process instance.
Each worker process can run multiple tasks in parallel, the worker type is Worker. Note that this module does not offer a fork
implementation, the worker processes begin from the start state. The primary use of process tasks is to make otherwise fatal tasks
recoverable, if the task calls unsafe code or code that can potentially terminate the entire process it should run using a Worker.
If you only want to recover from panics in safe code consider using task::run_catch or task::wait_catch instead.
This module also re-exports some ipc_channel types and functions. You can send IPC channels in the task request messages, this
can be useful for implementing progress reporting or to transfer large byte blobs.
§Examples
The example below demonstrates a worker-process setup that uses the same executable as the app-process.
fn main() {
zng::env::init!();
// normal app init..
}
mod task1 {
use zng::{task::ipc, env};
const NAME: &str = "zng::example::task1";
env::on_process_start!(|args| {
// give tracing handlers a chance to observe the worker-process
if args.yield_count == 0 { return args.yield_once(); }
ipc::run_worker(NAME, work);
});
async fn work(args: ipc::RequestArgs<Request>) -> Response {
let rsp = format!("received 'task1' request `{:?}` in worker-process #{}", &args.request.data, std::process::id());
Response { data: rsp }
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Request { pub data: String }
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Response { pub data: String }
// called in app-process
pub async fn start() -> ipc::Worker<Request, Response> {
ipc::Worker::start(NAME).await.expect("cannot spawn 'task1'")
}
}
// This runs in the app-process, it starts a worker process and requests a task run.
async fn on_click() {
println!("app-process #{} starting a worker", std::process::id());
let mut worker = task1::start().await;
// request a task run and await it.
match worker.run(task1::Request { data: "request".to_owned() }).await {
Ok(task1::Response { data }) => println!("ok. {data}"),
Err(e) => eprintln!("error: {e}"),
}
// multiple tasks can be requested in parallel, use `task::all!` to await ..
// the worker process can be gracefully shutdown, awaits all pending tasks.
let _ = worker.shutdown().await;
}
Note that you can setup multiple workers the same executable, as long as the on_process_start! call happens
on different modules.
§Connect Timeout
If the worker process takes longer than 10 seconds to connect the tasks fails. This is more then enough in most cases, but
it can be too little in some test runner machines. You can set the "ZNG_TASK_WORKER_TIMEOUT" environment variable to a custom
timeout in seconds. The minimum value is 1 second, set to 0 or empty use the default timeout.
Re-exports§
pub use ipc_channel::ipc::IpcBytesReceiver;pub use ipc_channel::ipc::IpcBytesSender;pub use ipc_channel::ipc::IpcReceiver;pub use ipc_channel::ipc::IpcSender;pub use ipc_channel::ipc::bytes_channel;
Structs§
- Request
Args - Arguments for
run_worker. - Worker
- Represents a running worker process.
- Worker
Crash Error - Info about a worker process crash.
Enums§
- RunError
- Worker run error.
Constants§
- VERSION
- The App Process and Worker Process must be build using the same exact version and this is validated during run-time, causing a panic if the versions don’t match.
Traits§
- IpcValue
- Represents a type that can be an input and output of IPC workers.
Functions§
- run_
worker - If the process was started by a
Workerruns the worker loop and never returns. If not started as worker does nothing.