1#![cfg(all(
2 feature = "trace_recorder",
3 not(any(target_arch = "wasm32", target_os = "android", target_os = "ios"))
4))]
5
6use std::{
12 collections::HashMap,
13 fmt::{self, Write as _},
14 io::{self, Read},
15 path::{Path, PathBuf},
16 time::{Duration, SystemTime},
17};
18
19use parking_lot::Mutex;
20use serde::Deserialize as _;
21use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _};
22use zng_txt::{ToTxt as _, Txt};
23
24#[derive(Clone, Debug)]
26#[non_exhaustive]
27pub struct Trace {
28 pub processes: Vec<ProcessTrace>,
30}
31
32#[derive(Clone, Debug)]
34#[non_exhaustive]
35pub struct ProcessTrace {
36 pub id: u64,
38
39 pub name: Txt,
41
42 pub threads: Vec<ThreadTrace>,
44
45 pub start: SystemTime,
51}
52
53#[derive(Clone)]
55#[non_exhaustive]
56pub struct ThreadTrace {
57 pub name: Txt,
59
60 pub events: Vec<EventTrace>,
62 pub spans: Vec<SpanTrace>,
64}
65impl fmt::Debug for ThreadTrace {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_struct("ThreadTrace")
68 .field("name", &self.name)
69 .field("events.len()", &self.events.len())
70 .field("spans.len()", &self.spans.len())
71 .finish()
72 }
73}
74
75#[derive(Clone, Debug)]
77#[non_exhaustive]
78pub struct EventTrace {
79 pub info: Info,
81 pub instant: Duration,
83}
84
85#[derive(Clone, Debug)]
87#[non_exhaustive]
88pub struct SpanTrace {
89 pub info: Info,
91
92 pub start: Duration,
94 pub end: Duration,
96}
97
98#[derive(Clone, Debug)]
100#[non_exhaustive]
101pub struct Info {
102 pub name: Txt,
104 pub categories: Vec<Txt>,
108 pub file: Txt,
110 pub line: u32,
112 pub args: HashMap<Txt, Txt>,
114}
115
116impl Trace {
117 pub fn read_chrome_trace(json_path: impl AsRef<Path>) -> io::Result<Self> {
123 let json = std::fs::read_to_string(json_path)?;
124 let trace = Self::parse_chrome_trace(&json)?;
125 Ok(trace)
126 }
127
128 pub fn parse_chrome_trace(json: &str) -> io::Result<Self> {
134 fn invalid_data(msg: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
135 io::Error::new(io::ErrorKind::InvalidData, msg)
136 }
137
138 let json = json.trim_start();
140 if !json.starts_with('[') {
141 return Err(invalid_data("expected JSON array"));
142 }
143 let json = &json[1..];
144
145 enum Phase {
146 Begin,
147 End,
148 Event,
149 }
150 struct Entry {
151 phase: Phase,
152 pid: u64,
153 tid: u64,
154 ts: Duration,
155 name: Txt,
156 cat: Vec<Txt>,
157 file: Txt,
158 line: u32,
159 args: HashMap<Txt, Txt>,
160 }
161 let mut process_sys_pid = HashMap::new();
162 let mut process_names = HashMap::new();
163 let mut process_record_start = HashMap::new();
164 let mut thread_names = HashMap::new();
165 let mut entries = vec![];
166
167 let mut reader = std::io::Cursor::new(json.as_bytes());
168 loop {
169 let mut pos = reader.position();
171 let mut buf = [0u8];
172 while reader.read(&mut buf).is_ok() {
173 if !b" \r\n\t,".contains(&buf[0]) {
174 break;
175 }
176 pos = reader.position();
177 }
178 reader.set_position(pos);
179 let mut de = serde_json::Deserializer::from_reader(&mut reader);
180 match serde_json::Value::deserialize(&mut de) {
181 Ok(entry) => match entry {
182 serde_json::Value::Object(map) => {
183 let phase_str = match map.get("ph") {
184 Some(serde_json::Value::String(ph)) => match ph.as_str() {
185 "B" | "E" | "i" | "M" => ph.as_str(),
186 u => {
187 tracing::error!("ignoring unknown or unsupported phase `{u:?}`");
188 continue;
189 }
190 },
191 _ => return Err(invalid_data("expected \"ph\"")),
192 };
193 let pid = match map.get("pid") {
194 Some(serde_json::Value::Number(n)) => match n.as_u64() {
195 Some(pid) => pid,
196 None => return Err(invalid_data("expected \"pid\"")),
197 },
198 _ => return Err(invalid_data("expected \"pid\"")),
199 };
200 let name = match map.get("name") {
201 Some(serde_json::Value::String(name)) => name.to_txt(),
202 _ => return Err(invalid_data("expected \"name\"")),
203 };
204
205 let args: HashMap<Txt, Txt> = match map.get("args") {
206 Some(a) => match serde_json::from_value(a.clone()) {
207 Ok(a) => a,
208 Err(e) => {
209 tracing::error!("only simple text args are supported, {e}");
210 continue;
211 }
212 },
213 _ => HashMap::new(),
214 };
215
216 if phase_str == "M" && name == "process_name" {
217 if let Some(n) = args.get("name") {
218 process_names.insert(pid, n.to_txt());
219 }
220 continue;
221 }
222
223 let tid = match map.get("tid") {
224 Some(serde_json::Value::Number(n)) => match n.as_u64() {
225 Some(tid) => tid,
226 None => return Err(invalid_data("expected \"tid\"")),
227 },
228 _ => return Err(invalid_data("expected \"tid\"")),
229 };
230
231 let phase = match phase_str {
232 "B" => Phase::Begin,
233 "E" => Phase::End,
234 "i" => Phase::Event,
235 "M" => {
236 if name == "thread_name"
237 && let Some(n) = args.get("name")
238 {
239 thread_names.insert(tid, n.to_txt());
240 }
241 continue;
242 }
243 _ => unreachable!(),
244 };
245
246 let ts = match map.get("ts") {
247 Some(serde_json::Value::Number(ts)) => match ts.as_f64() {
248 Some(ts) => Duration::from_nanos((ts * 1000.0).round() as u64),
249 None => return Err(invalid_data("expected \"ts\"")),
250 },
251 _ => return Err(invalid_data("expected \"ts\"")),
252 };
253 let cat = match map.get("cat") {
254 Some(serde_json::Value::String(cat)) => cat.split(',').map(|c| c.trim().to_txt()).collect(),
255 _ => vec![],
256 };
257 let file = match map.get(".file") {
258 Some(serde_json::Value::String(file)) => file.to_txt(),
259 _ => Txt::from_static(""),
260 };
261 let line = match map.get(".line") {
262 Some(serde_json::Value::Number(line)) => line.as_u64().unwrap_or(0) as u32,
263 _ => 0,
264 };
265
266 if let Some(msg) = args.get("message") {
267 if let Some(process_ts) = msg.strip_prefix("zng-record-start: ") {
268 if let Ok(process_ts) = process_ts.parse::<u64>() {
269 process_record_start.insert(pid, SystemTime::UNIX_EPOCH + Duration::from_micros(process_ts));
270 }
271 } else if let Some(rest) = msg.strip_prefix("pid: ")
272 && let Some((sys_pid, p_name)) = rest.split_once(", name: ")
273 && let Ok(sys_pid) = sys_pid.parse::<u64>()
274 {
275 process_sys_pid.insert(pid, sys_pid);
276 process_names.insert(pid, p_name.to_txt());
277 }
278 }
279
280 entries.push(Entry {
281 phase,
282 pid,
283 tid,
284 ts,
285 name,
286 cat,
287 file,
288 line,
289 args,
290 });
291 }
292 _ => return Err(invalid_data("expected JSON array of objects")),
293 },
294 Err(_) => {
295 break;
297 }
298 }
299 }
300
301 let mut out = Trace { processes: vec![] };
302
303 for mut entry in entries {
304 let sys_pid = *process_sys_pid.entry(entry.pid).or_insert(entry.pid);
305 let process = if let Some(p) = out.processes.iter_mut().find(|p| p.id == sys_pid) {
306 p
307 } else {
308 out.processes.push(ProcessTrace {
309 id: sys_pid,
310 name: process_names.entry(entry.pid).or_insert_with(|| sys_pid.to_txt()).clone(),
311 threads: vec![],
312 start: process_record_start.get(&entry.pid).copied().unwrap_or(SystemTime::UNIX_EPOCH),
313 });
314 out.processes.last_mut().unwrap()
315 };
316
317 let thread_name = if let Some(custom_name) = entry.args.remove("thread") {
318 custom_name.clone()
319 } else {
320 thread_names.entry(entry.tid).or_insert_with(|| entry.tid.to_txt()).clone()
321 };
322 let thread = if let Some(t) = process.threads.iter_mut().find(|t| t.name == thread_name) {
323 t
324 } else {
325 process.threads.push(ThreadTrace {
326 name: thread_name,
327 events: vec![],
328 spans: vec![],
329 });
330 process.threads.last_mut().unwrap()
331 };
332
333 fn entry_to_info(entry: Entry) -> Info {
334 Info {
335 name: entry.name,
336 categories: entry.cat,
337 file: entry.file,
338 line: entry.line,
339 args: entry.args,
340 }
341 }
342
343 match entry.phase {
344 Phase::Begin => thread.spans.push(SpanTrace {
345 start: entry.ts,
346 end: entry.ts,
347 info: entry_to_info(entry),
348 }),
349 Phase::End => {
350 let end = entry.ts;
351 let info = entry_to_info(entry);
352 if let Some(open) = thread.spans.iter_mut().rev().find(|s| s.start == s.end && s.info.name == info.name) {
355 open.end = end;
356 if open.start == open.end {
357 open.end += Duration::from_nanos(1);
359 }
360 open.info.merge(info);
361 }
362 }
363 Phase::Event => thread.events.push(EventTrace {
364 instant: entry.ts,
365 info: entry_to_info(entry),
366 }),
367 }
368 }
369
370 Ok(out)
371 }
372
373 pub fn to_chrome_trace(&self) -> Txt {
375 let mut out = String::new();
376
377 let _ = writeln!(&mut out, "[");
378
379 let mut sep = "";
380 for p in &self.processes {
381 let _ = write!(
382 &mut out,
383 r#"{sep}{{"ph":"M","pid":{},"name":"process_name","args":{{"name":"{}"}}}}"#,
384 p.id, p.name
385 );
386 sep = ",\n";
387 for (tid, t) in p.threads.iter().enumerate() {
388 let _ = write!(
389 &mut out,
390 r#"{sep}{{"ph":"M","pid":{}, "tid":{tid},"name":"process_name","args":{{"name":"{}"}}}}"#,
391 p.id, t.name
392 );
393
394 let mut items = Vec::with_capacity(t.events.len() + t.spans.len() * 2);
395 for ev in &t.events {
396 let obj = serde_json::json!({
397 "ph": "i",
398 "s": "t",
399 "ts": (ev.instant.as_nanos() as f64 / 1000.0),
400 "pid": p.id,
401 "tid": tid,
402 "name": ev.info.name,
403 "cat": ev.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
404 "args": ev.info.args,
405 ".file": ev.info.file,
406 ".line": ev.info.line,
407 });
408 items.push((ev.instant, obj));
409 }
410 for sp in &t.spans {
411 let start = serde_json::json!({
412 "ph": "B",
413 "s": "t",
414 "ts": (sp.start.as_nanos() as f64 / 1000.0),
415 "pid": p.id,
416 "tid": tid,
417 "name": sp.info.name,
418 "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
419 "args": sp.info.args,
420 ".file": sp.info.file,
421 ".line": sp.info.line,
422 });
423 items.push((sp.start, start));
424
425 let end = serde_json::json!({
426 "ph": "E",
427 "s": "t",
428 "ts": (sp.end.as_nanos() as f64 / 1000.0),
429 "pid": p.id,
430 "tid": tid,
431 "name": sp.info.name,
432 "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
433 "args": sp.info.args,
434 ".file": sp.info.file,
435 ".line": sp.info.line,
436 });
437 items.push((sp.end, end));
438 }
439
440 items.sort_by(|a, b| a.0.cmp(&b.0));
441
442 for (_, item) in items {
443 let item = serde_json::to_string(&item).unwrap();
444 let _ = write!(&mut out, "{sep}{item}");
445 }
446 }
447 }
448
449 let _ = writeln!(&mut out, "]");
450 out.to_txt()
451 }
452
453 pub fn write_chrome_trace(&self, json_path: impl AsRef<Path>) -> io::Result<()> {
455 std::fs::write(json_path, self.to_chrome_trace().as_str().as_bytes())
456 }
457
458 pub fn merge(&mut self, other: Self) {
460 for p in other.processes {
461 if let Some(ep) = self.processes.iter_mut().find(|ep| ep.id == p.id && ep.name == p.name) {
462 ep.merge(p);
463 } else {
464 self.processes.push(p);
465 }
466 }
467 }
468
469 pub fn sort(&mut self) {
471 self.processes.sort_by(|a, b| a.start.cmp(&b.start).then(a.name.cmp(&b.name)));
472 for p in &mut self.processes {
473 p.sort();
474 }
475 }
476}
477
478impl ProcessTrace {
479 pub fn merge(&mut self, other: Self) {
481 for t in other.threads {
482 if let Some(et) = self.threads.iter_mut().find(|et| et.name == t.name) {
483 et.merge(t);
484 } else {
485 self.threads.push(t);
486 }
487 }
488 }
489
490 pub fn sort(&mut self) {
492 self.threads.sort_by(|a, b| a.start().cmp(&b.start()).then(a.name.cmp(&b.name)));
493 for t in &mut self.threads {
494 t.sort();
495 }
496 }
497}
498
499impl ThreadTrace {
500 pub fn start(&self) -> Duration {
502 self.events
503 .iter()
504 .map(|e| e.instant)
505 .min()
506 .unwrap_or(Duration::MAX)
507 .min(self.spans.iter().map(|e| e.start).min().unwrap_or(Duration::MAX))
508 }
509
510 pub fn merge(&mut self, mut other: Self) {
512 self.events.append(&mut other.events);
513 self.spans.append(&mut other.spans);
514 }
515
516 pub fn sort(&mut self) {
520 self.events.sort_by(|a, b| a.instant.cmp(&b.instant));
521 self.spans.sort_by(|a, b| a.start.cmp(&b.start).then(b.start.cmp(&a.start)));
522 }
523}
524
525impl Info {
526 pub fn merge(&mut self, info: Info) {
528 if !info.file.is_empty() {
529 self.file = info.file;
530 self.line = info.line;
531 }
532 self.args.extend(info.args);
533 }
534}
535
536pub fn start_recording(output_dir: Option<PathBuf>) {
554 let mut rec = recording();
555 if rec.is_some() {
556 return;
558 }
559
560 let process_start = std::time::SystemTime::now()
561 .duration_since(std::time::SystemTime::UNIX_EPOCH)
562 .expect("cannot define process start timestamp")
563 .as_micros();
564
565 let output_dir = output_dir.unwrap_or_else(|| std::env::current_dir().expect("`current_dir` error").join("zng-trace"));
566
567 let timestamp = match std::env::var("ZNG_RECORD_TRACE_TIMESTAMP") {
569 Ok(t) => t,
570 Err(_) => {
571 let t = process_start.to_string();
572 unsafe {
574 std::env::set_var("ZNG_RECORD_TRACE_TIMESTAMP", t.clone());
575 }
576 t
577 }
578 };
579
580 let output_dir = output_dir.join(timestamp);
581 std::fs::create_dir_all(&output_dir).expect("cannot create `output_dir`");
582 let output_file = output_dir.join(format!("{}.json", std::process::id()));
583
584 let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
585 .include_args(true)
586 .file(output_file)
587 .category_fn(Box::new(|es| match es {
588 tracing_chrome::EventOrSpan::Event(event) => format!("{},{}", event.metadata().target(), event.metadata().level()),
589 tracing_chrome::EventOrSpan::Span(span_ref) => format!("{},{}", span_ref.metadata().target(), span_ref.metadata().level()),
590 }))
591 .build();
592 *rec = Some(guard);
593
594 let env_layer = EnvFilter::try_from_env("ZNG_RECORD_TRACE_FILTER")
595 .or_else(|_| EnvFilter::try_from_default_env())
596 .unwrap_or_else(|_| EnvFilter::new("trace"));
597
598 tracing_subscriber::registry().with(env_layer).with(chrome_layer).init();
599 zng_env::on_process_exit(|_| stop_recording());
600
601 tracing::info!("zng-record-start: {process_start}");
602}
603
604pub fn stop_recording() {
608 *recording() = None;
609}
610
611zng_env::on_process_start!(|_| {
612 if std::env::var("ZNG_RECORD_TRACE").is_ok() {
613 start_recording(std::env::var("ZNG_RECORD_TRACE_DIR").ok().map(PathBuf::from));
614 }
615});
616
617zng_app_context::hot_static! {
618 static RECORDING: Mutex<Option<tracing_chrome::FlushGuard>> = Mutex::new(None);
619}
620fn recording() -> parking_lot::MutexGuard<'static, Option<tracing_chrome::FlushGuard>> {
621 zng_app_context::hot_static_ref!(RECORDING).lock()
622}