zng_app/
trace_recorder.rs

1#![cfg(all(
2    feature = "trace_recorder",
3    not(any(target_arch = "wasm32", target_os = "android", target_os = "ios"))
4))]
5
6//! Trace recording and data model.
7//!
8//! All tracing instrumentation in Zng projects is done using the `tracing` crate, this module uses the `tracing-chrome` crate
9//! to record traces that can be viewed in `chrome://tracing` or `ui.perfetto.dev` and can be parsed to the [`Trace`] data model.
10
11use std::{
12    collections::HashMap,
13    fmt::{self, Write as _},
14    io::{self, Read},
15    path::{Path, PathBuf},
16    time::{Duration, SystemTime},
17};
18
19use parking_lot::Mutex;
20use serde::Deserialize as _;
21use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _};
22use zng_txt::{ToTxt as _, Txt};
23
24/// Represents a recorded trace.
25#[derive(Clone, Debug)]
26#[non_exhaustive]
27pub struct Trace {
28    /// Traced app processes.
29    pub processes: Vec<ProcessTrace>,
30}
31
32/// Represents a single app process in a recorded trace.
33#[derive(Clone, Debug)]
34#[non_exhaustive]
35pub struct ProcessTrace {
36    /// System process ID.
37    pub id: u64,
38
39    /// Process name.
40    pub name: Txt,
41
42    /// Traced threads on the process.
43    pub threads: Vec<ThreadTrace>,
44
45    /// Process start instant.
46    ///
47    /// This time stamp is system dependent, if the system time changes before a second app process starts it can show as starting first.
48    ///
49    /// If [`SystemTime::UNIX_EPOCH`] if the recorder does not support time.
50    pub start: SystemTime,
51}
52
53/// Represents a single thread in an app process in a recorded trace.
54#[derive(Clone)]
55#[non_exhaustive]
56pub struct ThreadTrace {
57    /// Thread name.
58    pub name: Txt,
59
60    /// Events that happened on the thread.
61    pub events: Vec<EventTrace>,
62    /// Spans started and ended on the thread.
63    pub spans: Vec<SpanTrace>,
64}
65impl fmt::Debug for ThreadTrace {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        f.debug_struct("ThreadTrace")
68            .field("name", &self.name)
69            .field("events.len()", &self.events.len())
70            .field("spans.len()", &self.spans.len())
71            .finish()
72    }
73}
74
75/// Represents a traced event.
76#[derive(Clone, Debug)]
77#[non_exhaustive]
78pub struct EventTrace {
79    /// Event info.
80    pub info: Info,
81    /// Moment from the recording start when this event happened.
82    pub instant: Duration,
83}
84
85/// Represents a traced span.
86#[derive(Clone, Debug)]
87#[non_exhaustive]
88pub struct SpanTrace {
89    /// Span info.
90    pub info: Info,
91
92    /// Moment from the recording start when this span started.
93    pub start: Duration,
94    /// Moment from the recording start when this span ended.
95    pub end: Duration,
96}
97
98/// Common info traced about events and spans.
99#[derive(Clone, Debug)]
100#[non_exhaustive]
101pub struct Info {
102    /// Event or span name.
103    pub name: Txt,
104    /// Categories.
105    ///
106    /// Zng recordings usually write two categories, "target" and "level".
107    pub categories: Vec<Txt>,
108    /// File where the event or span was traced.
109    pub file: Txt,
110    /// Code line where the event or span was traced.
111    pub line: u32,
112    /// Custom args traced with the event or span.
113    pub args: HashMap<Txt, Txt>,
114}
115
116impl Trace {
117    /// Read and parse a Chrome JSON Array format trace.
118    ///
119    /// See [`parse_chrome_trace`] for more details.
120    ///
121    /// [`parse_chrome_trace`]: Self::parse_chrome_trace
122    pub fn read_chrome_trace(json_path: impl AsRef<Path>) -> io::Result<Self> {
123        let json = std::fs::read_to_string(json_path)?;
124        let trace = Self::parse_chrome_trace(&json)?;
125        Ok(trace)
126    }
127
128    /// Parse a Chrome JSON Array format trace.
129    ///
130    /// Only supports the "phases" emitted by `tracing-chrome` in `TraceStyle::Threaded` mode, those are `B, E, i, M` for `M` only
131    /// supports `thread_name` metadata. Also parses the custom messages that define the process name and start timestamp as defined
132    /// by the `zng::app::trace_recorder` documentation.
133    pub fn parse_chrome_trace(json: &str) -> io::Result<Self> {
134        fn invalid_data(msg: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
135            io::Error::new(io::ErrorKind::InvalidData, msg)
136        }
137
138        // skip the array opening
139        let json = json.trim_start();
140        if !json.starts_with('[') {
141            return Err(invalid_data("expected JSON array"));
142        }
143        let json = &json[1..];
144
145        enum Phase {
146            Begin,
147            End,
148            Event,
149        }
150        struct Entry {
151            phase: Phase,
152            pid: u64,
153            tid: u64,
154            ts: Duration,
155            name: Txt,
156            cat: Vec<Txt>,
157            file: Txt,
158            line: u32,
159            args: HashMap<Txt, Txt>,
160        }
161        let mut process_sys_pid = HashMap::new();
162        let mut process_names = HashMap::new();
163        let mut process_record_start = HashMap::new();
164        let mut thread_names = HashMap::new();
165        let mut entries = vec![];
166
167        let mut reader = std::io::Cursor::new(json.as_bytes());
168        loop {
169            // skip white space and commas to the next object
170            let mut pos = reader.position();
171            let mut buf = [0u8];
172            while reader.read(&mut buf).is_ok() {
173                if !b" \r\n\t,".contains(&buf[0]) {
174                    break;
175                }
176                pos = reader.position();
177            }
178            reader.set_position(pos);
179            let mut de = serde_json::Deserializer::from_reader(&mut reader);
180            match serde_json::Value::deserialize(&mut de) {
181                Ok(entry) => match entry {
182                    serde_json::Value::Object(map) => {
183                        let phase_str = match map.get("ph") {
184                            Some(serde_json::Value::String(ph)) => match ph.as_str() {
185                                "B" | "E" | "i" | "M" => ph.as_str(),
186                                u => {
187                                    tracing::error!("ignoring unknown or unsupported phase `{u:?}`");
188                                    continue;
189                                }
190                            },
191                            _ => return Err(invalid_data("expected \"ph\"")),
192                        };
193                        let pid = match map.get("pid") {
194                            Some(serde_json::Value::Number(n)) => match n.as_u64() {
195                                Some(pid) => pid,
196                                None => return Err(invalid_data("expected \"pid\"")),
197                            },
198                            _ => return Err(invalid_data("expected \"pid\"")),
199                        };
200                        let name = match map.get("name") {
201                            Some(serde_json::Value::String(name)) => name.to_txt(),
202                            _ => return Err(invalid_data("expected \"name\"")),
203                        };
204
205                        let args: HashMap<Txt, Txt> = match map.get("args") {
206                            Some(a) => match serde_json::from_value(a.clone()) {
207                                Ok(a) => a,
208                                Err(e) => {
209                                    tracing::error!("only simple text args are supported, {e}");
210                                    continue;
211                                }
212                            },
213                            _ => HashMap::new(),
214                        };
215
216                        if phase_str == "M" && name == "process_name" {
217                            if let Some(n) = args.get("name") {
218                                process_names.insert(pid, n.to_txt());
219                            }
220                            continue;
221                        }
222
223                        let tid = match map.get("tid") {
224                            Some(serde_json::Value::Number(n)) => match n.as_u64() {
225                                Some(tid) => tid,
226                                None => return Err(invalid_data("expected \"tid\"")),
227                            },
228                            _ => return Err(invalid_data("expected \"tid\"")),
229                        };
230
231                        let phase = match phase_str {
232                            "B" => Phase::Begin,
233                            "E" => Phase::End,
234                            "i" => Phase::Event,
235                            "M" => {
236                                if name == "thread_name"
237                                    && let Some(n) = args.get("name")
238                                {
239                                    thread_names.insert(tid, n.to_txt());
240                                }
241                                continue;
242                            }
243                            _ => unreachable!(),
244                        };
245
246                        let ts = match map.get("ts") {
247                            Some(serde_json::Value::Number(ts)) => match ts.as_f64() {
248                                Some(ts) => Duration::from_nanos((ts * 1000.0).round() as u64),
249                                None => return Err(invalid_data("expected \"ts\"")),
250                            },
251                            _ => return Err(invalid_data("expected \"ts\"")),
252                        };
253                        let cat = match map.get("cat") {
254                            Some(serde_json::Value::String(cat)) => cat.split(',').map(|c| c.trim().to_txt()).collect(),
255                            _ => vec![],
256                        };
257                        let file = match map.get(".file") {
258                            Some(serde_json::Value::String(file)) => file.to_txt(),
259                            _ => Txt::from_static(""),
260                        };
261                        let line = match map.get(".line") {
262                            Some(serde_json::Value::Number(line)) => line.as_u64().unwrap_or(0) as u32,
263                            _ => 0,
264                        };
265
266                        if let Some(msg) = args.get("message") {
267                            if let Some(process_ts) = msg.strip_prefix("zng-record-start: ") {
268                                if let Ok(process_ts) = process_ts.parse::<u64>() {
269                                    process_record_start.insert(pid, SystemTime::UNIX_EPOCH + Duration::from_micros(process_ts));
270                                }
271                            } else if let Some(rest) = msg.strip_prefix("pid: ")
272                                && let Some((sys_pid, p_name)) = rest.split_once(", name: ")
273                                && let Ok(sys_pid) = sys_pid.parse::<u64>()
274                            {
275                                process_sys_pid.insert(pid, sys_pid);
276                                process_names.insert(pid, p_name.to_txt());
277                            }
278                        }
279
280                        entries.push(Entry {
281                            phase,
282                            pid,
283                            tid,
284                            ts,
285                            name,
286                            cat,
287                            file,
288                            line,
289                            args,
290                        });
291                    }
292                    _ => return Err(invalid_data("expected JSON array of objects")),
293                },
294                Err(_) => {
295                    // EOF
296                    break;
297                }
298            }
299        }
300
301        let mut out = Trace { processes: vec![] };
302
303        for entry in entries {
304            let sys_pid = *process_sys_pid.entry(entry.pid).or_insert(entry.pid);
305            let process = if let Some(p) = out.processes.iter_mut().find(|p| p.id == sys_pid) {
306                p
307            } else {
308                out.processes.push(ProcessTrace {
309                    id: sys_pid,
310                    name: process_names.entry(entry.pid).or_insert_with(|| sys_pid.to_txt()).clone(),
311                    threads: vec![],
312                    start: process_record_start.get(&entry.pid).copied().unwrap_or(SystemTime::UNIX_EPOCH),
313                });
314                out.processes.last_mut().unwrap()
315            };
316
317            let thread_name = thread_names.entry(entry.tid).or_insert_with(|| entry.tid.to_txt()).clone();
318            let thread = if let Some(t) = process.threads.iter_mut().find(|t| t.name == thread_name) {
319                t
320            } else {
321                process.threads.push(ThreadTrace {
322                    name: thread_name,
323                    events: vec![],
324                    spans: vec![],
325                });
326                process.threads.last_mut().unwrap()
327            };
328
329            fn entry_to_info(entry: Entry) -> Info {
330                Info {
331                    name: entry.name,
332                    categories: entry.cat,
333                    file: entry.file,
334                    line: entry.line,
335                    args: entry.args,
336                }
337            }
338
339            match entry.phase {
340                Phase::Begin => thread.spans.push(SpanTrace {
341                    start: entry.ts,
342                    end: entry.ts,
343                    info: entry_to_info(entry),
344                }),
345                Phase::End => {
346                    let end = entry.ts;
347                    let info = entry_to_info(entry);
348                    // recording always closes inner first (example: [begin1,begin2,end2,end1])
349                    // this will search [begin1,begin2] in reverse for end2 and close that first, even if begin1 has the same name
350                    if let Some(open) = thread.spans.iter_mut().rev().find(|s| s.start == s.end && s.info.name == info.name) {
351                        open.end = end;
352                        if open.start == open.end {
353                            // timing is in microseconds so we can use 1ns here to help the logic
354                            open.end += Duration::from_nanos(1);
355                        }
356                        open.info.merge(info);
357                    }
358                }
359                Phase::Event => thread.events.push(EventTrace {
360                    instant: entry.ts,
361                    info: entry_to_info(entry),
362                }),
363            }
364        }
365
366        Ok(out)
367    }
368
369    /// Convert the trace to Chrome JSON Array format.
370    pub fn to_chrome_trace(&self) -> Txt {
371        let mut out = String::new();
372
373        let _ = writeln!(&mut out, "[");
374
375        let mut sep = "";
376        for p in &self.processes {
377            let _ = write!(
378                &mut out,
379                r#"{sep}{{"ph":"M","pid":{},"name":"process_name","args":{{"name":"{}"}}}}"#,
380                p.id, p.name
381            );
382            sep = ",\n";
383            for (tid, t) in p.threads.iter().enumerate() {
384                let _ = write!(
385                    &mut out,
386                    r#"{sep}{{"ph":"M","pid":{}, "tid":{tid},"name":"process_name","args":{{"name":"{}"}}}}"#,
387                    p.id, t.name
388                );
389
390                let mut items = Vec::with_capacity(t.events.len() + t.spans.len() * 2);
391                for ev in &t.events {
392                    let obj = serde_json::json!({
393                        "ph": "i",
394                        "s": "t",
395                        "ts": (ev.instant.as_nanos() as f64 / 1000.0),
396                        "pid": p.id,
397                        "tid": tid,
398                        "name": ev.info.name,
399                        "cat": ev.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
400                        "args": ev.info.args,
401                        ".file": ev.info.file,
402                        ".line": ev.info.line,
403                    });
404                    items.push((ev.instant, obj));
405                }
406                for sp in &t.spans {
407                    let start = serde_json::json!({
408                        "ph": "B",
409                        "s": "t",
410                        "ts": (sp.start.as_nanos() as f64 / 1000.0),
411                        "pid": p.id,
412                        "tid": tid,
413                        "name": sp.info.name,
414                        "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
415                        "args": sp.info.args,
416                        ".file": sp.info.file,
417                        ".line": sp.info.line,
418                    });
419                    items.push((sp.start, start));
420
421                    let end = serde_json::json!({
422                        "ph": "E",
423                        "s": "t",
424                        "ts": (sp.end.as_nanos() as f64 / 1000.0),
425                        "pid": p.id,
426                        "tid": tid,
427                        "name": sp.info.name,
428                        "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
429                        "args": sp.info.args,
430                        ".file": sp.info.file,
431                        ".line": sp.info.line,
432                    });
433                    items.push((sp.end, end));
434                }
435
436                items.sort_by(|a, b| a.0.cmp(&b.0));
437
438                for (_, item) in items {
439                    let item = serde_json::to_string(&item).unwrap();
440                    let _ = write!(&mut out, "{sep}{item}");
441                }
442            }
443        }
444
445        let _ = writeln!(&mut out, "]");
446        out.to_txt()
447    }
448
449    /// Convert and write the trace to Chrome JSON Array format.
450    pub fn write_chrome_trace(&self, json_path: impl AsRef<Path>) -> io::Result<()> {
451        std::fs::write(json_path, self.to_chrome_trace().as_str().as_bytes())
452    }
453
454    /// Merge `other` into this.
455    pub fn merge(&mut self, other: Self) {
456        for p in other.processes {
457            if let Some(ep) = self.processes.iter_mut().find(|ep| ep.id == p.id && ep.name == p.name) {
458                ep.merge(p);
459            } else {
460                self.processes.push(p);
461            }
462        }
463    }
464
465    /// Sort processes processes and threads by start time then name, events by instant and spans by start.
466    pub fn sort(&mut self) {
467        self.processes.sort_by(|a, b| a.start.cmp(&b.start).then(a.name.cmp(&b.name)));
468        for p in &mut self.processes {
469            p.sort();
470        }
471    }
472}
473
474impl ProcessTrace {
475    /// Merge `other` into this.
476    pub fn merge(&mut self, other: Self) {
477        for t in other.threads {
478            if let Some(et) = self.threads.iter_mut().find(|et| et.name == t.name) {
479                et.merge(t);
480            } else {
481                self.threads.push(t);
482            }
483        }
484    }
485
486    /// Sort threads by name, events by instant and spans by start.
487    pub fn sort(&mut self) {
488        self.threads.sort_by(|a, b| a.start().cmp(&b.start()).then(a.name.cmp(&b.name)));
489        for t in &mut self.threads {
490            t.sort();
491        }
492    }
493}
494
495impl ThreadTrace {
496    /// Gets the minimum event or span start in the thread.
497    pub fn start(&self) -> Duration {
498        self.events
499            .iter()
500            .map(|e| e.instant)
501            .min()
502            .unwrap_or(Duration::MAX)
503            .min(self.spans.iter().map(|e| e.start).min().unwrap_or(Duration::MAX))
504    }
505
506    /// Merge `other` into this.
507    pub fn merge(&mut self, mut other: Self) {
508        self.events.append(&mut other.events);
509        self.spans.append(&mut other.spans);
510    }
511
512    /// Sort events by instant and spans by start (then reverse end).
513    ///
514    /// After sorting if a span starts within the start..=end of the previous it is "inside" it.
515    pub fn sort(&mut self) {
516        self.events.sort_by(|a, b| a.instant.cmp(&b.instant));
517        self.spans.sort_by(|a, b| a.start.cmp(&b.start).then(b.start.cmp(&a.start)));
518    }
519}
520
521impl Info {
522    /// Merge `other` into this.
523    pub fn merge(&mut self, info: Info) {
524        if !info.file.is_empty() {
525            self.file = info.file;
526            self.line = info.line;
527        }
528        self.args.extend(info.args);
529    }
530}
531
532/// Starts recording, stops on process exit or on [`stop_recording`].
533///
534/// Note that this is called automatically on startup if the `"ZNG_RECORD_TRACE"` environment variable is set and that is
535/// the recommended way of enabling recording as it record all processes not just the calling process.
536///
537/// # Config and Output
538///
539/// See the `zng::app::trace_recorder` module documentation for details on how to configure the recording and the output file structure.
540///
541/// # Panics
542///
543/// Panics if another `tracing` subscriber was already inited.
544///
545/// Note that this can cause panics on any subsequent attempt to init subscribers, no other log subscriber must run after recording starts,
546/// including attempts to restart recording after stopping.
547///
548/// Panics cannot write to the output dir.
549pub fn start_recording(output_dir: Option<PathBuf>) {
550    let mut rec = recording();
551    if rec.is_some() {
552        // already recording
553        return;
554    }
555
556    let process_start = std::time::SystemTime::now()
557        .duration_since(std::time::SystemTime::UNIX_EPOCH)
558        .expect("cannot define process start timestamp")
559        .as_micros();
560
561    let output_dir = output_dir.unwrap_or_else(|| std::env::current_dir().expect("`current_dir` error").join("zng-trace"));
562
563    // first process sets the timestamp
564    let timestamp = match std::env::var("ZNG_RECORD_TRACE_TIMESTAMP") {
565        Ok(t) => t,
566        Err(_) => {
567            let t = process_start.to_string();
568            // SAFETY: safe, only read by this pure Rust code in subsequent started processes.
569            unsafe {
570                std::env::set_var("ZNG_RECORD_TRACE_TIMESTAMP", t.clone());
571            }
572            t
573        }
574    };
575
576    let output_dir = output_dir.join(timestamp);
577    std::fs::create_dir_all(&output_dir).expect("cannot create `output_dir`");
578    let output_file = output_dir.join(format!("{}.json", std::process::id()));
579
580    let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
581        .include_args(true)
582        .file(output_file)
583        .category_fn(Box::new(|es| match es {
584            tracing_chrome::EventOrSpan::Event(event) => format!("{},{}", event.metadata().target(), event.metadata().level()),
585            tracing_chrome::EventOrSpan::Span(span_ref) => format!("{},{}", span_ref.metadata().target(), span_ref.metadata().level()),
586        }))
587        .build();
588    *rec = Some(guard);
589
590    let env_layer = EnvFilter::try_from_env("ZNG_RECORD_TRACE_FILTER")
591        .or_else(|_| EnvFilter::try_from_default_env())
592        .unwrap_or_else(|_| EnvFilter::new("trace"));
593
594    tracing_subscriber::registry().with(env_layer).with(chrome_layer).init();
595    zng_env::on_process_exit(|_| stop_recording());
596
597    tracing::info!("zng-record-start: {process_start}");
598}
599
600/// Stops recording and flushes.
601///
602/// Note that this is called automatically on process exit.
603pub fn stop_recording() {
604    *recording() = None;
605}
606
607zng_env::on_process_start!(|_| {
608    if std::env::var("ZNG_RECORD_TRACE").is_ok() {
609        start_recording(std::env::var("ZNG_RECORD_TRACE_DIR").ok().map(PathBuf::from));
610    }
611});
612
613zng_app_context::hot_static! {
614    static RECORDING: Mutex<Option<tracing_chrome::FlushGuard>> = Mutex::new(None);
615}
616fn recording() -> parking_lot::MutexGuard<'static, Option<tracing_chrome::FlushGuard>> {
617    zng_app_context::hot_static_ref!(RECORDING).lock()
618}