1#![cfg(all(
2 feature = "trace_recorder",
3 not(any(target_arch = "wasm32", target_os = "android", target_os = "ios"))
4))]
5
6use std::{
12 collections::HashMap,
13 fmt::{self, Write as _},
14 io::{self, Read},
15 path::{Path, PathBuf},
16 time::{Duration, SystemTime},
17};
18
19use parking_lot::Mutex;
20use serde::Deserialize as _;
21use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _};
22use zng_txt::{ToTxt as _, Txt};
23
24#[derive(Clone, Debug)]
26#[non_exhaustive]
27pub struct Trace {
28 pub processes: Vec<ProcessTrace>,
30}
31
32#[derive(Clone, Debug)]
34#[non_exhaustive]
35pub struct ProcessTrace {
36 pub id: u64,
38
39 pub name: Txt,
41
42 pub threads: Vec<ThreadTrace>,
44
45 pub start: SystemTime,
51}
52
53#[derive(Clone)]
55#[non_exhaustive]
56pub struct ThreadTrace {
57 pub name: Txt,
59
60 pub events: Vec<EventTrace>,
62 pub spans: Vec<SpanTrace>,
64}
65impl fmt::Debug for ThreadTrace {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_struct("ThreadTrace")
68 .field("name", &self.name)
69 .field("events.len()", &self.events.len())
70 .field("spans.len()", &self.spans.len())
71 .finish()
72 }
73}
74
75#[derive(Clone, Debug)]
77#[non_exhaustive]
78pub struct EventTrace {
79 pub info: Info,
81 pub instant: Duration,
83}
84
85#[derive(Clone, Debug)]
87#[non_exhaustive]
88pub struct SpanTrace {
89 pub info: Info,
91
92 pub start: Duration,
94 pub end: Duration,
96}
97
98#[derive(Clone, Debug)]
100#[non_exhaustive]
101pub struct Info {
102 pub name: Txt,
104 pub categories: Vec<Txt>,
108 pub file: Txt,
110 pub line: u32,
112 pub args: HashMap<Txt, Txt>,
114}
115
116impl Trace {
117 pub fn read_chrome_trace(json_path: impl AsRef<Path>) -> io::Result<Self> {
123 let json = std::fs::read_to_string(json_path)?;
124 let trace = Self::parse_chrome_trace(&json)?;
125 Ok(trace)
126 }
127
128 pub fn parse_chrome_trace(json: &str) -> io::Result<Self> {
134 fn invalid_data(msg: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
135 io::Error::new(io::ErrorKind::InvalidData, msg)
136 }
137
138 let json = json.trim_start();
140 if !json.starts_with('[') {
141 return Err(invalid_data("expected JSON array"));
142 }
143 let json = &json[1..];
144
145 enum Phase {
146 Begin,
147 End,
148 Event,
149 }
150 struct Entry {
151 phase: Phase,
152 pid: u64,
153 tid: u64,
154 ts: Duration,
155 name: Txt,
156 cat: Vec<Txt>,
157 file: Txt,
158 line: u32,
159 args: HashMap<Txt, Txt>,
160 }
161 let mut process_sys_pid = HashMap::new();
162 let mut process_names = HashMap::new();
163 let mut process_record_start = HashMap::new();
164 let mut thread_names = HashMap::new();
165 let mut entries = vec![];
166
167 let mut reader = std::io::Cursor::new(json.as_bytes());
168 loop {
169 let mut pos = reader.position();
171 let mut buf = [0u8];
172 while reader.read(&mut buf).is_ok() {
173 if !b" \r\n\t,".contains(&buf[0]) {
174 break;
175 }
176 pos = reader.position();
177 }
178 reader.set_position(pos);
179 let mut de = serde_json::Deserializer::from_reader(&mut reader);
180 match serde_json::Value::deserialize(&mut de) {
181 Ok(entry) => match entry {
182 serde_json::Value::Object(map) => {
183 let phase_str = match map.get("ph") {
184 Some(serde_json::Value::String(ph)) => match ph.as_str() {
185 "B" | "E" | "i" | "M" => ph.as_str(),
186 u => {
187 tracing::error!("ignoring unknown or unsupported phase `{u:?}`");
188 continue;
189 }
190 },
191 _ => return Err(invalid_data("expected \"ph\"")),
192 };
193 let pid = match map.get("pid") {
194 Some(serde_json::Value::Number(n)) => match n.as_u64() {
195 Some(pid) => pid,
196 None => return Err(invalid_data("expected \"pid\"")),
197 },
198 _ => return Err(invalid_data("expected \"pid\"")),
199 };
200 let name = match map.get("name") {
201 Some(serde_json::Value::String(name)) => name.to_txt(),
202 _ => return Err(invalid_data("expected \"name\"")),
203 };
204
205 let args: HashMap<Txt, Txt> = match map.get("args") {
206 Some(a) => match serde_json::from_value(a.clone()) {
207 Ok(a) => a,
208 Err(e) => {
209 tracing::error!("only simple text args are supported, {e}");
210 continue;
211 }
212 },
213 _ => HashMap::new(),
214 };
215
216 if phase_str == "M" && name == "process_name" {
217 if let Some(n) = args.get("name") {
218 process_names.insert(pid, n.to_txt());
219 }
220 continue;
221 }
222
223 let tid = match map.get("tid") {
224 Some(serde_json::Value::Number(n)) => match n.as_u64() {
225 Some(tid) => tid,
226 None => return Err(invalid_data("expected \"tid\"")),
227 },
228 _ => return Err(invalid_data("expected \"tid\"")),
229 };
230
231 let phase = match phase_str {
232 "B" => Phase::Begin,
233 "E" => Phase::End,
234 "i" => Phase::Event,
235 "M" => {
236 if name == "thread_name"
237 && let Some(n) = args.get("name")
238 {
239 thread_names.insert(tid, n.to_txt());
240 }
241 continue;
242 }
243 _ => unreachable!(),
244 };
245
246 let ts = match map.get("ts") {
247 Some(serde_json::Value::Number(ts)) => match ts.as_f64() {
248 Some(ts) => Duration::from_nanos((ts * 1000.0).round() as u64),
249 None => return Err(invalid_data("expected \"ts\"")),
250 },
251 _ => return Err(invalid_data("expected \"ts\"")),
252 };
253 let cat = match map.get("cat") {
254 Some(serde_json::Value::String(cat)) => cat.split(',').map(|c| c.trim().to_txt()).collect(),
255 _ => vec![],
256 };
257 let file = match map.get(".file") {
258 Some(serde_json::Value::String(file)) => file.to_txt(),
259 _ => Txt::from_static(""),
260 };
261 let line = match map.get(".line") {
262 Some(serde_json::Value::Number(line)) => line.as_u64().unwrap_or(0) as u32,
263 _ => 0,
264 };
265
266 if let Some(msg) = args.get("message") {
267 if let Some(process_ts) = msg.strip_prefix("zng-record-start: ") {
268 if let Ok(process_ts) = process_ts.parse::<u64>() {
269 process_record_start.insert(pid, SystemTime::UNIX_EPOCH + Duration::from_micros(process_ts));
270 }
271 } else if let Some(rest) = msg.strip_prefix("pid: ")
272 && let Some((sys_pid, p_name)) = rest.split_once(", name: ")
273 && let Ok(sys_pid) = sys_pid.parse::<u64>()
274 {
275 process_sys_pid.insert(pid, sys_pid);
276 process_names.insert(pid, p_name.to_txt());
277 }
278 }
279
280 entries.push(Entry {
281 phase,
282 pid,
283 tid,
284 ts,
285 name,
286 cat,
287 file,
288 line,
289 args,
290 });
291 }
292 _ => return Err(invalid_data("expected JSON array of objects")),
293 },
294 Err(_) => {
295 break;
297 }
298 }
299 }
300
301 let mut out = Trace { processes: vec![] };
302
303 for entry in entries {
304 let sys_pid = *process_sys_pid.entry(entry.pid).or_insert(entry.pid);
305 let process = if let Some(p) = out.processes.iter_mut().find(|p| p.id == sys_pid) {
306 p
307 } else {
308 out.processes.push(ProcessTrace {
309 id: sys_pid,
310 name: process_names.entry(entry.pid).or_insert_with(|| sys_pid.to_txt()).clone(),
311 threads: vec![],
312 start: process_record_start.get(&entry.pid).copied().unwrap_or(SystemTime::UNIX_EPOCH),
313 });
314 out.processes.last_mut().unwrap()
315 };
316
317 let thread_name = thread_names.entry(entry.tid).or_insert_with(|| entry.tid.to_txt()).clone();
318 let thread = if let Some(t) = process.threads.iter_mut().find(|t| t.name == thread_name) {
319 t
320 } else {
321 process.threads.push(ThreadTrace {
322 name: thread_name,
323 events: vec![],
324 spans: vec![],
325 });
326 process.threads.last_mut().unwrap()
327 };
328
329 fn entry_to_info(entry: Entry) -> Info {
330 Info {
331 name: entry.name,
332 categories: entry.cat,
333 file: entry.file,
334 line: entry.line,
335 args: entry.args,
336 }
337 }
338
339 match entry.phase {
340 Phase::Begin => thread.spans.push(SpanTrace {
341 start: entry.ts,
342 end: entry.ts,
343 info: entry_to_info(entry),
344 }),
345 Phase::End => {
346 let end = entry.ts;
347 let info = entry_to_info(entry);
348 if let Some(open) = thread.spans.iter_mut().rev().find(|s| s.start == s.end && s.info.name == info.name) {
351 open.end = end;
352 if open.start == open.end {
353 open.end += Duration::from_nanos(1);
355 }
356 open.info.merge(info);
357 }
358 }
359 Phase::Event => thread.events.push(EventTrace {
360 instant: entry.ts,
361 info: entry_to_info(entry),
362 }),
363 }
364 }
365
366 Ok(out)
367 }
368
369 pub fn to_chrome_trace(&self) -> Txt {
371 let mut out = String::new();
372
373 let _ = writeln!(&mut out, "[");
374
375 let mut sep = "";
376 for p in &self.processes {
377 let _ = write!(
378 &mut out,
379 r#"{sep}{{"ph":"M","pid":{},"name":"process_name","args":{{"name":"{}"}}}}"#,
380 p.id, p.name
381 );
382 sep = ",\n";
383 for (tid, t) in p.threads.iter().enumerate() {
384 let _ = write!(
385 &mut out,
386 r#"{sep}{{"ph":"M","pid":{}, "tid":{tid},"name":"process_name","args":{{"name":"{}"}}}}"#,
387 p.id, t.name
388 );
389
390 let mut items = Vec::with_capacity(t.events.len() + t.spans.len() * 2);
391 for ev in &t.events {
392 let obj = serde_json::json!({
393 "ph": "i",
394 "s": "t",
395 "ts": (ev.instant.as_nanos() as f64 / 1000.0),
396 "pid": p.id,
397 "tid": tid,
398 "name": ev.info.name,
399 "cat": ev.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
400 "args": ev.info.args,
401 ".file": ev.info.file,
402 ".line": ev.info.line,
403 });
404 items.push((ev.instant, obj));
405 }
406 for sp in &t.spans {
407 let start = serde_json::json!({
408 "ph": "B",
409 "s": "t",
410 "ts": (sp.start.as_nanos() as f64 / 1000.0),
411 "pid": p.id,
412 "tid": tid,
413 "name": sp.info.name,
414 "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
415 "args": sp.info.args,
416 ".file": sp.info.file,
417 ".line": sp.info.line,
418 });
419 items.push((sp.start, start));
420
421 let end = serde_json::json!({
422 "ph": "E",
423 "s": "t",
424 "ts": (sp.end.as_nanos() as f64 / 1000.0),
425 "pid": p.id,
426 "tid": tid,
427 "name": sp.info.name,
428 "cat": sp.info.categories.iter().fold(String::new(), |a, b| format!("{a},{b}")),
429 "args": sp.info.args,
430 ".file": sp.info.file,
431 ".line": sp.info.line,
432 });
433 items.push((sp.end, end));
434 }
435
436 items.sort_by(|a, b| a.0.cmp(&b.0));
437
438 for (_, item) in items {
439 let item = serde_json::to_string(&item).unwrap();
440 let _ = write!(&mut out, "{sep}{item}");
441 }
442 }
443 }
444
445 let _ = writeln!(&mut out, "]");
446 out.to_txt()
447 }
448
449 pub fn write_chrome_trace(&self, json_path: impl AsRef<Path>) -> io::Result<()> {
451 std::fs::write(json_path, self.to_chrome_trace().as_str().as_bytes())
452 }
453
454 pub fn merge(&mut self, other: Self) {
456 for p in other.processes {
457 if let Some(ep) = self.processes.iter_mut().find(|ep| ep.id == p.id && ep.name == p.name) {
458 ep.merge(p);
459 } else {
460 self.processes.push(p);
461 }
462 }
463 }
464
465 pub fn sort(&mut self) {
467 self.processes.sort_by(|a, b| a.start.cmp(&b.start).then(a.name.cmp(&b.name)));
468 for p in &mut self.processes {
469 p.sort();
470 }
471 }
472}
473
474impl ProcessTrace {
475 pub fn merge(&mut self, other: Self) {
477 for t in other.threads {
478 if let Some(et) = self.threads.iter_mut().find(|et| et.name == t.name) {
479 et.merge(t);
480 } else {
481 self.threads.push(t);
482 }
483 }
484 }
485
486 pub fn sort(&mut self) {
488 self.threads.sort_by(|a, b| a.start().cmp(&b.start()).then(a.name.cmp(&b.name)));
489 for t in &mut self.threads {
490 t.sort();
491 }
492 }
493}
494
495impl ThreadTrace {
496 pub fn start(&self) -> Duration {
498 self.events
499 .iter()
500 .map(|e| e.instant)
501 .min()
502 .unwrap_or(Duration::MAX)
503 .min(self.spans.iter().map(|e| e.start).min().unwrap_or(Duration::MAX))
504 }
505
506 pub fn merge(&mut self, mut other: Self) {
508 self.events.append(&mut other.events);
509 self.spans.append(&mut other.spans);
510 }
511
512 pub fn sort(&mut self) {
516 self.events.sort_by(|a, b| a.instant.cmp(&b.instant));
517 self.spans.sort_by(|a, b| a.start.cmp(&b.start).then(b.start.cmp(&a.start)));
518 }
519}
520
521impl Info {
522 pub fn merge(&mut self, info: Info) {
524 if !info.file.is_empty() {
525 self.file = info.file;
526 self.line = info.line;
527 }
528 self.args.extend(info.args);
529 }
530}
531
532pub fn start_recording(output_dir: Option<PathBuf>) {
550 let mut rec = recording();
551 if rec.is_some() {
552 return;
554 }
555
556 let process_start = std::time::SystemTime::now()
557 .duration_since(std::time::SystemTime::UNIX_EPOCH)
558 .expect("cannot define process start timestamp")
559 .as_micros();
560
561 let output_dir = output_dir.unwrap_or_else(|| std::env::current_dir().expect("`current_dir` error").join("zng-trace"));
562
563 let timestamp = match std::env::var("ZNG_RECORD_TRACE_TIMESTAMP") {
565 Ok(t) => t,
566 Err(_) => {
567 let t = process_start.to_string();
568 unsafe {
570 std::env::set_var("ZNG_RECORD_TRACE_TIMESTAMP", t.clone());
571 }
572 t
573 }
574 };
575
576 let output_dir = output_dir.join(timestamp);
577 std::fs::create_dir_all(&output_dir).expect("cannot create `output_dir`");
578 let output_file = output_dir.join(format!("{}.json", std::process::id()));
579
580 let (chrome_layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
581 .include_args(true)
582 .file(output_file)
583 .category_fn(Box::new(|es| match es {
584 tracing_chrome::EventOrSpan::Event(event) => format!("{},{}", event.metadata().target(), event.metadata().level()),
585 tracing_chrome::EventOrSpan::Span(span_ref) => format!("{},{}", span_ref.metadata().target(), span_ref.metadata().level()),
586 }))
587 .build();
588 *rec = Some(guard);
589
590 let env_layer = EnvFilter::try_from_env("ZNG_RECORD_TRACE_FILTER")
591 .or_else(|_| EnvFilter::try_from_default_env())
592 .unwrap_or_else(|_| EnvFilter::new("trace"));
593
594 tracing_subscriber::registry().with(env_layer).with(chrome_layer).init();
595 zng_env::on_process_exit(|_| stop_recording());
596
597 tracing::info!("zng-record-start: {process_start}");
598}
599
600pub fn stop_recording() {
604 *recording() = None;
605}
606
607zng_env::on_process_start!(|_| {
608 if std::env::var("ZNG_RECORD_TRACE").is_ok() {
609 start_recording(std::env::var("ZNG_RECORD_TRACE_DIR").ok().map(PathBuf::from));
610 }
611});
612
613zng_app_context::hot_static! {
614 static RECORDING: Mutex<Option<tracing_chrome::FlushGuard>> = Mutex::new(None);
615}
616fn recording() -> parking_lot::MutexGuard<'static, Option<tracing_chrome::FlushGuard>> {
617 zng_app_context::hot_static_ref!(RECORDING).lock()
618}