zng_app/
trace_recorder.rs

1#![cfg(all(
2    feature = "trace_recorder",
3    not(any(target_arch = "wasm32", target_os = "android", target_os = "ios"))
4))]
5
6//! Trace recording and data model.
7//!
8//! All tracing instrumentation in Zng projects is done using the `tracing` crate, this module uses the `tracing-chrome` crate
9//! to record traces that can be viewed in `chrome://tracing` or `ui.perfetto.dev` and can be parsed to the [`Trace`] data model.
10
11use std::{
12    collections::HashMap,
13    fmt::{self, Write as _},
14    io::{self, Read},
15    path::{Path, PathBuf},
16    time::{Duration, SystemTime},
17};
18
19use parking_lot::Mutex;
20use serde::Deserialize as _;
21use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _};
22use zng_txt::{ToTxt as _, Txt};
23
24/// Represents a recorded trace.
25#[derive(Clone, Debug)]
26#[non_exhaustive]
27pub struct Trace {
28    /// Traced app processes.
29    pub processes: Vec<ProcessTrace>,
30}
31
32/// Represents a single app process in a recorded trace.
33#[derive(Clone, Debug)]
34#[non_exhaustive]
35pub struct ProcessTrace {
36    /// System process ID.
37    pub id: u64,
38
39    /// Process name.
40    pub name: Txt,
41
42    /// Traced threads on the process.
43    pub threads: Vec<ThreadTrace>,
44
45    /// Process start instant.
46    ///
47    /// This time stamp is system dependent, if the system time changes before a second app process starts it can show as starting first.
48    ///
49    /// If [`SystemTime::UNIX_EPOCH`] if the recorder does not support time.
50    pub start: SystemTime,
51}
52
53/// Represents a single thread in an app process in a recorded trace.
54#[derive(Clone)]
55#[non_exhaustive]
56pub struct ThreadTrace {
57    /// Thread name.
58    pub name: Txt,
59
60    /// Events that happened on the thread.
61    pub events: Vec<EventTrace>,
62    /// Spans started and ended on the thread.
63    pub spans: Vec<SpanTrace>,
64}
65impl fmt::Debug for ThreadTrace {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        f.debug_struct("ThreadTrace")
68            .field("name", &self.name)
69            .field("events.len()", &self.events.len())
70            .field("spans.len()", &self.spans.len())
71            .finish()
72    }
73}
74
75/// Represents a traced event.
76#[derive(Clone, Debug)]
77#[non_exhaustive]
78pub struct EventTrace {
79    /// Event info.
80    pub info: Info,
81    /// Moment from the recording start when this event happened.
82    pub instant: Duration,
83}
84
85/// Represents a traced span.
86#[derive(Clone, Debug)]
87#[non_exhaustive]
88pub struct SpanTrace {
89    /// Span info.
90    pub info: Info,
91
92    /// Moment from the recording start when this span started.
93    pub start: Duration,
94    /// Moment from the recording start when this span ended.
95    pub end: Duration,
96}
97
98/// Common info traced about events and spans.
99#[derive(Clone, Debug)]
100#[non_exhaustive]
101pub struct Info {
102    /// Event or span name.
103    pub name: Txt,
104    /// Categories.
105    ///
106    /// Zng recordings usually write two categories, "target" and "level".
107    pub categories: Vec<Txt>,
108    /// File where the event or span was traced.
109    pub file: Txt,
110    /// Code line where the event or span was traced.
111    pub line: u32,
112    /// Custom args traced with the event or span.
113    pub args: HashMap<Txt, Txt>,
114}
115
116impl Trace {
117    /// Read and parse a Chrome JSON Array format trace.
118    ///
119    /// See [`parse_chrome_trace`] for more details.
120    ///
121    /// [`parse_chrome_trace`]: Self::parse_chrome_trace
122    pub fn read_chrome_trace(json_path: impl AsRef<Path>) -> io::Result<Self> {
123        let json = std::fs::read_to_string(json_path)?;
124        let trace = Self::parse_chrome_trace(&json)?;
125        Ok(trace)
126    }
127
128    /// Parse a Chrome JSON Array format trace.
129    ///
130    /// Only supports the "phases" emitted by `tracing-chrome` in `TraceStyle::Threaded` mode, those are `B, E, i, M` for `M` only
131    /// supports `thread_name` metadata. Also parses the custom messages that define the process name and start timestamp as defined
132    /// by the `zng::app::trace_recorder` documentation.
133    pub fn parse_chrome_trace(json: &str) -> io::Result<Self> {
134        fn invalid_data(msg: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
135            io::Error::new(io::ErrorKind::InvalidData, msg)
136        }
137
138        // skip the array opening
139        let json = json.trim_start();
140        if !json.starts_with('[') {
141            return Err(invalid_data("expected JSON array"));
142        }
143        let json = &json[1..];
144
145        enum Phase {
146            Begin,
147            End,
148            Event,
149        }
150        struct Entry {
151            phase: Phase,
152            pid: u64,
153            tid: u64,
154            ts: Duration,
155            name: Txt,
156            cat: Vec<Txt>,
157            file: Txt,
158            line: u32,
159            args: HashMap<Txt, Txt>,
160        }
161        let mut process_sys_pid = HashMap::new();
162        let mut process_names = HashMap::new();
163        let mut process_record_start = HashMap::new();
164        let mut thread_names = HashMap::new();
165        let mut entries = vec![];
166
167        let mut reader = std::io::Cursor::new(json.as_bytes());
168        loop {
169            // skip white space and commas to the next object
170            let mut pos = reader.position();
171            let mut buf = [0u8];
172            while reader.read(&mut buf).is_ok() {
173                if !b" \r\n\t,".contains(&buf[0]) {
174                    break;
175                }
176                pos = reader.position();
177            }
178            reader.set_position(pos);
179            let mut de = serde_json::Deserializer::from_reader(&mut reader);
180            match serde_json::Value::deserialize(&mut de) {
181                Ok(entry) => match entry {
182                    serde_json::Value::Object(map) => {
183                        let phase_str = match map.get("ph") {
184                            Some(serde_json::Value::String(ph)) => match ph.as_str() {
185                                "B" | "E" | "i" | "M" => ph.as_str(),
186                                u => {
187                                    tracing::error!("ignoring unknown or unsupported phase `{u:?}`");
188                                    continue;
189                                }
190                            },
191                            _ => return Err(invalid_data("expected \"ph\"")),
192                        };
193                        let pid = match map.get("pid") {
194                            Some(serde_json::Value::Number(n)) => match n.as_u64() {
195                                Some(pid) => pid,
196                                None => return Err(invalid_data("expected \"pid\"")),
197                            },
198                            _ => return Err(invalid_data("expected \"pid\"")),
199                        };
200                        let name = match map.get("name") {
201                            Some(serde_json::Value::String(name)) => name.to_txt(),
202                            _ => return Err(invalid_data("expected \"name\"")),
203                        };
204
205                        let args: HashMap<Txt, Txt> = match map.get("args") {
206                            Some(a) => match serde_json::from_value(a.clone()) {
207                                Ok(a) => a,
208                                Err(e) => {
209                                    tracing::error!("only simple text args are supported, {e}");
210                                    continue;
211                                }
212                            },
213                            _ => HashMap::new(),
214                        };
215
216                        if phase_str == "M" && name == "process_name" {
217                            if let Some(n) = args.get("name") {
218                                process_names.insert(pid, n.to_txt());
219                            }
220                            continue;
221                        }
222
223                        let tid = match map.get("tid") {
224                            Some(serde_json::Value::Number(n)) => match n.as_u64() {
225                                Some(tid) => tid,
226                                None => return Err(invalid_data("expected \"tid\"")),
227                            },
228                            _ => return Err(invalid_data("expected \"tid\"")),
229                        };
230
231                        let phase = match phase_str {
232                            "B" => Phase::Begin,
233                            "E" => Phase::End,
234                            "i" => Phase::Event,
235                            "M" => {
236                                if name == "thread_name"
237                                    && let Some(n) = args.get("name")
238                                {
239                                    thread_names.insert(tid, n.to_txt());
240                                }
241                                continue;
242                            }
243                            _ => unreachable!(),
244                        };
245
246                        let ts = match map.get("ts") {
247                            Some(serde_json::Value::Number(ts)) => match ts.as_f64() {
248                                Some(ts) => Duration::from_nanos((ts * 1000.0).round() as u64),
249                                None => return Err(invalid_data("expected \"ts\"")),
250                            },
251                            _ => return Err(invalid_data("expected \"ts\"")),
252                        };
253                        let cat = match map.get("cat") {
254                            Some(serde_json::Value::String(cat)) => cat.split(',').map(|c| c.trim().to_txt()).collect(),
255                            _ => vec![],
256                        };
257                        let file = match map.get(".file") {
258                            Some(serde_json::Value::String(file)) => file.to_txt(),
259                            _ => Txt::from_static(""),
260                        };
261                        let line = match map.get(".line") {
262                            Some(serde_json::Value::Number(line)) => line.as_u64().unwrap_or(0) as u32,
263                            _ => 0,
264                        };
265
266                        if let Some(msg) = args.get("message") {
267                            if let Some(process_ts) = msg.strip_prefix("zng-record-start: ") {
268                                if let Ok(process_ts) = process_ts.parse::<u64>() {
269                                    process_record_start.insert(pid, SystemTime::UNIX_EPOCH + Duration::from_micros(process_ts));
270                                }
271                            } else if let Some(rest) = msg.strip_prefix("pid: ")
272                                && let Some((sys_pid, p_name)) = rest.split_once(", name: ")
273                                && let Ok(sys_pid) = sys_pid.parse::<u64>()
274                            {
275                                process_sys_pid.insert(pid, sys_pid);
276                                process_names.insert(pid, p_name.to_txt());
277                            }
278                        }
279
280                        entries.push(Entry {
281                            phase,
282                            pid,
283                            tid,
284                            ts,
285                            name,
286                            cat,
287                            file,
288                            line,
289                            args,
290                        });
291                    }
292                    _ => return Err(invalid_data("expected JSON array of objects")),
293                },
294                Err(_) => {
295                    // EOF
296                    break;
297                }
298            }
299        }
300
301        let mut out = Trace { processes: vec![] };
302
303        for mut entry in entries {
304            let sys_pid = *process_sys_pid.entry(entry.pid).or_insert(entry.pid);
305            let process = if let Some(p) = out.processes.iter_mut().find(|p| p.id == sys_pid) {
306                p
307            } else {
308                out.processes.push(ProcessTrace {
309                    id: sys_pid,
310                    name: process_names.entry(entry.pid).or_insert_with(|| sys_pid.to_txt()).clone(),
311                    threads: vec![],
312                    start: process_record_start.get(&entry.pid).copied().unwrap_or(SystemTime::UNIX_EPOCH),
313                });
314                out.processes.last_mut().unwrap()
315            };
316
317            let thread_name = if let Some(custom_name) = entry.args.remove("thread") {
318                custom_name.clone()
319            } else {
320                thread_names.entry(entry.tid).or_insert_with(|| entry.tid.to_txt()).clone()
321            };
322            let thread = if let Some(t) = process.threads.iter_mut().find(|t| t.name == thread_name) {
323                t
324            } else {
325                process.threads.push(ThreadTrace {
326                    name: thread_name,
327                    events: vec![],
328                    spans: vec![],
329                });
330                process.threads.last_mut().unwrap()
331            };
332
333            fn entry_to_info(entry: Entry) -> Info {
334                Info {
335                    name: entry.name,
336                    categories: entry.cat,
337                    file: entry.file,
338                    line: entry.line,
339                    args: entry.args,
340                }
341            }
342
343            match entry.phase {
344                Phase::Begin => thread.spans.push(SpanTrace {
345                    start: entry.ts,
346                    end: entry.ts,
347                    info: entry_to_info(entry),
348                }),
349                Phase::End => {
350                    let end = entry.ts;
351                    let info = entry_to_info(entry);
352                    // recording always closes inner first (example: [begin1,begin2,end2,end1])
353                    // this will search [begin1,begin2] in reverse for end2 and close that first, even if begin1 has the same name
354                    if let Some(open) = thread.spans.iter_mut().rev().find(|s| s.start == s.end && s.info.name == info.name) {
355                        open.end = end;
356                        if open.start == open.end {
357                            // timing is in microseconds so we can use 1ns here to help the logic
358                            open.end += Duration::from_nanos(1);
359                        }
360                        open.info.merge(info);
361                    }
362                }
363                Phase::Event => thread.events.push(EventTrace {
364                    instant: entry.ts,
365                    info: entry_to_info(entry),
366                }),
367            }
368        }
369
370        Ok(out)
371    }
372
373    /// Convert the trace to Chrome JSON Array format.
374    pub fn to_chrome_trace(&self) -> Txt {
375        let mut out = String::new();
376
377        let _ = writeln!(&mut out, "[");
378
379        let mut sep = "";
380        for p in &self.processes {
381            let _ = write!(
382                &mut out,
383                r#"{sep}{{"ph":"M","pid":{},"name":"process_name","args":{{"name":"{}"}}}}"#,
384                p.id, p.name
385            );
386            sep = ",\n";
387            for (tid, t) in p.threads.iter().enumerate() {
388                let _ = write!(
389                    &mut out,
390                    r#"{sep}{{"ph":"M","pid":{}, "tid":{tid},"name":"process_name","args":{{"name":"{}"}}}}"#,
391                    p.id, t.name
392                );
393
394                let mut items = Vec::with_capacity(t.events.len() + t.spans.len() * 2);
395                for ev in &t.events {
396                    let obj = serde_json::json!({
397                        "ph": "i",
398                        "s": "t",
399                        "ts": (ev.instant.as_nanos() as f64 / 1000.0),
400                        "pid": p.id,
401                        "tid": tid,
402                        "name": ev.info.name,
403                        "cat": ev.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
404                        "args": ev.info.args,
405                        ".file": ev.info.file,
406                        ".line": ev.info.line,
407                    });
408                    items.push((ev.instant, obj));
409                }
410                for sp in &t.spans {
411                    let start = serde_json::json!({
412                        "ph": "B",
413                        "s": "t",
414                        "ts": (sp.start.as_nanos() as f64 / 1000.0),
415                        "pid": p.id,
416                        "tid": tid,
417                        "name": sp.info.name,
418                        "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
419                        "args": sp.info.args,
420                        ".file": sp.info.file,
421                        ".line": sp.info.line,
422                    });
423                    items.push((sp.start, start));
424
425                    let end = serde_json::json!({
426                        "ph": "E",
427                        "s": "t",
428                        "ts": (sp.end.as_nanos() as f64 / 1000.0),
429                        "pid": p.id,
430                        "tid": tid,
431                        "name": sp.info.name,
432                        "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
433                        "args": sp.info.args,
434                        ".file": sp.info.file,
435                        ".line": sp.info.line,
436                    });
437                    items.push((sp.end, end));
438                }
439
440                items.sort_by(|a, b| a.0.cmp(&b.0));
441
442                for (_, item) in items {
443                    let item = serde_json::to_string(&item).unwrap();
444                    let _ = write!(&mut out, "{sep}{item}");
445                }
446            }
447        }
448
449        let _ = writeln!(&mut out, "]");
450        out.to_txt()
451    }
452
453    /// Convert and write the trace to Chrome JSON Array format.
454    pub fn write_chrome_trace(&self, json_path: impl AsRef<Path>) -> io::Result<()> {
455        std::fs::write(json_path, self.to_chrome_trace().as_str().as_bytes())
456    }
457
458    /// Merge `other` into this.
459    pub fn merge(&mut self, other: Self) {
460        for p in other.processes {
461            if let Some(ep) = self.processes.iter_mut().find(|ep| ep.id == p.id && ep.name == p.name) {
462                ep.merge(p);
463            } else {
464                self.processes.push(p);
465            }
466        }
467    }
468
469    /// Sort processes processes and threads by start time then name, events by instant and spans by start.
470    pub fn sort(&mut self) {
471        self.processes.sort_by(|a, b| a.start.cmp(&b.start).then(a.name.cmp(&b.name)));
472        for p in &mut self.processes {
473            p.sort();
474        }
475    }
476}
477
478impl ProcessTrace {
479    /// Merge `other` into this.
480    pub fn merge(&mut self, other: Self) {
481        for t in other.threads {
482            if let Some(et) = self.threads.iter_mut().find(|et| et.name == t.name) {
483                et.merge(t);
484            } else {
485                self.threads.push(t);
486            }
487        }
488    }
489
490    /// Sort threads by name, events by instant and spans by start.
491    pub fn sort(&mut self) {
492        self.threads.sort_by(|a, b| a.start().cmp(&b.start()).then(a.name.cmp(&b.name)));
493        for t in &mut self.threads {
494            t.sort();
495        }
496    }
497}
498
499impl ThreadTrace {
500    /// Gets the minimum event or span start in the thread.
501    pub fn start(&self) -> Duration {
502        self.events
503            .iter()
504            .map(|e| e.instant)
505            .min()
506            .unwrap_or(Duration::MAX)
507            .min(self.spans.iter().map(|e| e.start).min().unwrap_or(Duration::MAX))
508    }
509
510    /// Merge `other` into this.
511    pub fn merge(&mut self, mut other: Self) {
512        self.events.append(&mut other.events);
513        self.spans.append(&mut other.spans);
514    }
515
516    /// Sort events by instant and spans by start (then reverse end).
517    ///
518    /// After sorting if a span starts within the start..=end of the previous it is "inside" it.
519    pub fn sort(&mut self) {
520        self.events.sort_by(|a, b| a.instant.cmp(&b.instant));
521        self.spans.sort_by(|a, b| a.start.cmp(&b.start).then(b.start.cmp(&a.start)));
522    }
523}
524
525impl Info {
526    /// Merge `other` into this.
527    pub fn merge(&mut self, info: Info) {
528        if !info.file.is_empty() {
529            self.file = info.file;
530            self.line = info.line;
531        }
532        self.args.extend(info.args);
533    }
534}
535
536/// Starts recording, stops on process exit or on [`stop_recording`].
537///
538/// Note that this is called automatically on startup if the `"ZNG_RECORD_TRACE"` environment variable is set and that is
539/// the recommended way of enabling recording as it record all processes not just the calling process.
540///
541/// # Config and Output
542///
543/// See the `zng::app::trace_recorder` module documentation for details on how to configure the recording and the output file structure.
544///
545/// # Panics
546///
547/// Panics if another `tracing` subscriber was already inited.
548///
549/// Note that this can cause panics on any subsequent attempt to init subscribers, no other log subscriber must run after recording starts,
550/// including attempts to restart recording after stopping.
551///
552/// Panics cannot write to the output dir.
553pub fn start_recording(output_dir: Option<PathBuf>) {
554    let mut rec = recording();
555    if rec.is_some() {
556        // already recording
557        return;
558    }
559
560    let process_start = std::time::SystemTime::now()
561        .duration_since(std::time::SystemTime::UNIX_EPOCH)
562        .expect("cannot define process start timestamp")
563        .as_micros();
564
565    let output_dir = output_dir.unwrap_or_else(|| std::env::current_dir().expect("`current_dir` error").join("zng-trace"));
566
567    // first process sets the timestamp
568    let timestamp = match std::env::var("ZNG_RECORD_TRACE_TIMESTAMP") {
569        Ok(t) => t,
570        Err(_) => {
571            let t = process_start.to_string();
572            // SAFETY: safe, only read by this pure Rust code in subsequent started processes.
573            unsafe {
574                std::env::set_var("ZNG_RECORD_TRACE_TIMESTAMP", t.clone());
575            }
576            t
577        }
578    };
579
580    let output_dir = output_dir.join(timestamp);
581    std::fs::create_dir_all(&output_dir).expect("cannot create `output_dir`");
582    let output_file = output_dir.join(format!("{}.json", std::process::id()));
583
584    let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
585        .include_args(true)
586        .file(output_file)
587        .category_fn(Box::new(|es| match es {
588            tracing_chrome::EventOrSpan::Event(event) => format!("{},{}", event.metadata().target(), event.metadata().level()),
589            tracing_chrome::EventOrSpan::Span(span_ref) => format!("{},{}", span_ref.metadata().target(), span_ref.metadata().level()),
590        }))
591        .build();
592    *rec = Some(guard);
593
594    let env_layer = EnvFilter::try_from_env("ZNG_RECORD_TRACE_FILTER")
595        .or_else(|_| EnvFilter::try_from_default_env())
596        .unwrap_or_else(|_| EnvFilter::new("trace"));
597
598    tracing_subscriber::registry().with(env_layer).with(chrome_layer).init();
599    zng_env::on_process_exit(|_| stop_recording());
600
601    tracing::info!("zng-record-start: {process_start}");
602}
603
604/// Stops recording and flushes.
605///
606/// Note that this is called automatically on process exit.
607pub fn stop_recording() {
608    *recording() = None;
609}
610
611zng_env::on_process_start!(|_| {
612    if std::env::var("ZNG_RECORD_TRACE").is_ok() {
613        start_recording(std::env::var("ZNG_RECORD_TRACE_DIR").ok().map(PathBuf::from));
614    }
615});
616
617zng_app_context::hot_static! {
618    static RECORDING: Mutex<Option<tracing_chrome::FlushGuard>> = Mutex::new(None);
619}
620fn recording() -> parking_lot::MutexGuard<'static, Option<tracing_chrome::FlushGuard>> {
621    zng_app_context::hot_static_ref!(RECORDING).lock()
622}