cargo_zng/
trace.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::{Read as _, Write},
4    path::PathBuf,
5    time::SystemTime,
6};
7
8use clap::*;
9use serde::Deserialize as _;
10
11#[derive(Args, Debug, Default)]
12pub struct TraceArgs {
13    /// Path or command to run the Zng executable
14    ///
15    /// Example: `cargo zng trace "./some/exe"` or `cargo zng trace -- cargo run exe`
16    #[arg(trailing_var_arg = true)]
17    command: Vec<String>,
18
19    /// env_logger style filter
20    #[arg(long, short, default_value = "trace")]
21    filter: String,
22
23    /// Output JSON file
24    ///
25    /// {timestamp} and {ts} is replaced with a timestamp in microseconds from Unix epoch
26    #[arg(long, short, default_value = "./trace-{timestamp}.json")]
27    output: String,
28}
29
30pub fn run(args: TraceArgs) {
31    let mut cmd = {
32        let mut cmd = args.command.into_iter().peekable();
33        if let Some(c) = cmd.peek()
34            && c == "--"
35        {
36            cmd.next();
37        }
38        if let Some(c) = cmd.next() {
39            let mut o = std::process::Command::new(c);
40            o.args(cmd);
41            o
42        } else {
43            fatal!("COMMAND is required")
44        }
45    };
46
47    let ts = SystemTime::now()
48        .duration_since(SystemTime::UNIX_EPOCH)
49        .unwrap_or_default()
50        .as_micros()
51        .to_string();
52
53    let tmp = std::env::temp_dir().join("cargo-zng-trace");
54    if let Err(e) = std::fs::create_dir_all(&tmp) {
55        fatal!("cannot create temp dir, {e}");
56    }
57    let out_dir = tmp.join(&ts);
58    let _ = std::fs::remove_dir_all(&out_dir);
59
60    let out_file = PathBuf::from(args.output.replace("{timestamp}", &ts).replace("{ts}", &ts));
61    if let Some(p) = out_file.parent()
62        && let Err(e) = std::fs::create_dir_all(p)
63    {
64        fatal!("cannot output to {}, {e}", out_file.display());
65    }
66    let mut out = match std::fs::File::create(&out_file) {
67        Ok(f) => f,
68        Err(e) => fatal!("cannot output to {}, {e}", out_file.display()),
69    };
70
71    cmd.env("ZNG_RECORD_TRACE", "")
72        .env("ZNG_RECORD_TRACE_DIR", &tmp)
73        .env("ZNG_RECORD_TRACE_FILTER", args.filter)
74        .env("ZNG_RECORD_TRACE_TIMESTAMP", &ts);
75
76    let mut cmd = match cmd.spawn() {
77        Ok(c) => c,
78        Err(e) => fatal!("cannot run, {e}"),
79    };
80
81    let code = match cmd.wait() {
82        Ok(s) => s.code().unwrap_or(0),
83        Err(e) => {
84            error!("cannot wait command exit, {e}");
85            101
86        }
87    };
88
89    if !out_dir.exists() {
90        fatal!("run did not save any trace\nnote: the feature \"trace_recorder\" must be enabled during build")
91    }
92
93    println!("merging trace files...");
94
95    out.write_all(b"[\n")
96        .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
97    let mut separator = "";
98
99    for trace in glob::glob(out_dir.join("*.json").display().to_string().as_str())
100        .ok()
101        .into_iter()
102        .flatten()
103    {
104        let trace = match trace {
105            Ok(t) => t,
106            Err(e) => {
107                error!("error globing trace files, {e}");
108                continue;
109            }
110        };
111        let json = match std::fs::read_to_string(&trace) {
112            Ok(s) => s,
113            Err(e) => {
114                error!("cannot read {}, {e}", trace.display());
115                continue;
116            }
117        };
118
119        let name_sys_pid = trace
120            .file_name()
121            .unwrap_or_default()
122            .to_string_lossy()
123            .strip_suffix(".json")
124            .unwrap_or_default()
125            .to_owned();
126        let name_sys_pid = match name_sys_pid.parse::<u64>() {
127            Ok(i) => i,
128            Err(_) => {
129                error!("expected only {{pid}}.json files");
130                continue;
131            }
132        };
133
134        // skip the array opening
135        let json = json.trim_start();
136        if !json.starts_with('[') {
137            error!("unknown format in {}", trace.display());
138            continue;
139        }
140        let json = &json[1..];
141
142        let mut thread_ids = HashSet::new();
143        // peek thread ids
144        {
145            let mut search = json;
146            let prefix = r#""tid":"#;
147            while let Some(i) = search.find(prefix) {
148                search = &search[i + prefix.len()..];
149                if let Some(i) = search.find(',') {
150                    let tid = &search[..i];
151                    if let Ok(tid) = tid.parse::<u64>() {
152                        thread_ids.insert(tid);
153                    }
154                    search = &search[i + 1..];
155                }
156            }
157        }
158        let mut custom_thread_id = 9000u64;
159        let mut custom_threads = HashMap::new();
160
161        let mut reader = std::io::Cursor::new(json.as_bytes());
162        loop {
163            // skip white space and commas to the next object
164            let mut pos = reader.position();
165            let mut buf = [0u8];
166            while reader.read(&mut buf).is_ok() {
167                if !b" \r\n\t,".contains(&buf[0]) {
168                    break;
169                }
170                pos = reader.position();
171            }
172            reader.set_position(pos);
173            let mut de = serde_json::Deserializer::from_reader(&mut reader);
174            match serde_json::Value::deserialize(&mut de) {
175                Ok(mut entry) => {
176                    // patch "pid" to be unique
177                    if let Some(serde_json::Value::Number(pid)) = entry.get_mut("pid") {
178                        if pid.as_u64() != Some(1) {
179                            error!("expected only pid:1 in trace file");
180                            continue;
181                        }
182                        *pid = serde_json::Number::from(name_sys_pid);
183                    }
184
185                    // convert custom entries to actual trace format
186                    match &mut entry {
187                        serde_json::Value::Object(entry) => {
188                            enum Phase {
189                                Event,
190                                Begin,
191                                End,
192                                Other,
193                            }
194                            let mut ph = Phase::Other;
195                            if let Some(serde_json::Value::String(p)) = entry.get("ph") {
196                                match p.as_str() {
197                                    "i" => ph = Phase::Event,
198                                    "B" => ph = Phase::Begin,
199                                    "E" => ph = Phase::End,
200                                    _ => {}
201                                }
202                            }
203                            if let Some(serde_json::Value::Object(args)) = entry.get_mut("args") {
204                                match ph {
205                                    Phase::Event => {
206                                        // convert the INFO message process name to actual "process_name" metadata
207                                        if let Some(serde_json::Value::String(msg)) = args.get("message")
208                                            && let Some(rest) = msg.strip_prefix("pid: ")
209                                            && let Some((sys_pid, p_name)) = rest.split_once(", name: ")
210                                            && let Ok(sys_pid) = sys_pid.parse::<u64>()
211                                            && name_sys_pid == sys_pid
212                                        {
213                                            let args = format_args!(
214                                                r#"{separator}{{"ph":"M","pid":{sys_pid},"name":"process_name","args":{{"name":"{p_name}"}}}}"#,
215                                            );
216                                            out.write_fmt(args)
217                                                .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
218                                        } else if let Some(serde_json::Value::String(custom_thread)) = args.remove("thread")
219                                            && let Some(serde_json::Value::Number(n)) = entry.get_mut("tid")
220                                        {
221                                            let tid = match custom_threads.entry(custom_thread) {
222                                                std::collections::hash_map::Entry::Occupied(e) => *e.get(),
223                                                std::collections::hash_map::Entry::Vacant(e) => {
224                                                    let name = serde_json::Value::String(e.key().clone());
225
226                                                    while !thread_ids.insert(custom_thread_id) {
227                                                        custom_thread_id = custom_thread_id.wrapping_add(1);
228                                                    }
229                                                    e.insert(custom_thread_id);
230
231                                                    let args = format_args!(
232                                                        r#"{separator}{{"ph":"M","pid":{name_sys_pid},"tid":{custom_thread_id},"name":"thread_name","args":{{"name":{name}}}}}"#,
233                                                    );
234                                                    out.write_fmt(args)
235                                                        .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
236
237                                                    custom_thread_id
238                                                }
239                                            };
240                                            *n = tid.into();
241                                        }
242                                    }
243                                    Phase::Begin | Phase::End => {
244                                        // convert "thread" arg to actual tid
245                                        if let Some(serde_json::Value::String(custom_thread)) = args.remove("thread")
246                                            && let Some(serde_json::Value::Number(n)) = entry.get_mut("tid")
247                                        {
248                                            let tid = match custom_threads.entry(custom_thread.clone()) {
249                                                std::collections::hash_map::Entry::Occupied(e) => *e.get(),
250                                                std::collections::hash_map::Entry::Vacant(e) => {
251                                                    let name = serde_json::Value::String(e.key().clone());
252
253                                                    while !thread_ids.insert(custom_thread_id) {
254                                                        custom_thread_id = custom_thread_id.wrapping_add(1);
255                                                    }
256                                                    e.insert(custom_thread_id);
257
258                                                    let args = format_args!(
259                                                        r#"{separator}{{"ph":"M","pid":{name_sys_pid},"tid":{custom_thread_id},"name":"thread_name","args":{{"name":{name}}}}}"#,
260                                                    );
261                                                    out.write_fmt(args)
262                                                        .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
263
264                                                    custom_thread_id
265                                                }
266                                            };
267                                            *n = tid.into();
268                                        }
269                                    }
270                                    Phase::Other => {}
271                                }
272                            }
273                        }
274                        _ => {
275                            error!("unknown format in {}", trace.display());
276                        }
277                    }
278
279                    out.write_all(separator.as_bytes())
280                        .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
281                    serde_json::to_writer(&mut out, &entry).unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
282                    separator = ",\n";
283                }
284                Err(_) => break,
285            }
286        }
287    }
288
289    out.write_all(b"\n]")
290        .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
291    println!("saved to {}", out_file.display());
292
293    if code == 0 {
294        crate::util::exit();
295    } else {
296        // forward the exit code from the exe or cmd
297        std::process::exit(code);
298    }
299}