1use std::{
2 collections::{HashMap, HashSet},
3 io::{Read as _, Write},
4 path::PathBuf,
5 time::SystemTime,
6};
7
8use clap::*;
9use serde::Deserialize as _;
10
11#[derive(Args, Debug, Default)]
12pub struct TraceArgs {
13 #[arg(trailing_var_arg = true)]
17 command: Vec<String>,
18
19 #[arg(long, short, default_value = "trace")]
21 filter: String,
22
23 #[arg(long, short, default_value = "./trace-{timestamp}.json")]
27 output: String,
28}
29
30pub fn run(args: TraceArgs) {
31 let mut cmd = {
32 let mut cmd = args.command.into_iter().peekable();
33 if let Some(c) = cmd.peek()
34 && c == "--"
35 {
36 cmd.next();
37 }
38 if let Some(c) = cmd.next() {
39 let mut o = std::process::Command::new(c);
40 o.args(cmd);
41 o
42 } else {
43 fatal!("COMMAND is required")
44 }
45 };
46
47 let ts = SystemTime::now()
48 .duration_since(SystemTime::UNIX_EPOCH)
49 .unwrap_or_default()
50 .as_micros()
51 .to_string();
52
53 let tmp = std::env::temp_dir().join("cargo-zng-trace");
54 if let Err(e) = std::fs::create_dir_all(&tmp) {
55 fatal!("cannot create temp dir, {e}");
56 }
57 let out_dir = tmp.join(&ts);
58 let _ = std::fs::remove_dir_all(&out_dir);
59
60 let out_file = PathBuf::from(args.output.replace("{timestamp}", &ts).replace("{ts}", &ts));
61 if let Some(p) = out_file.parent()
62 && let Err(e) = std::fs::create_dir_all(p)
63 {
64 fatal!("cannot output to {}, {e}", out_file.display());
65 }
66 let mut out = match std::fs::File::create(&out_file) {
67 Ok(f) => f,
68 Err(e) => fatal!("cannot output to {}, {e}", out_file.display()),
69 };
70
71 cmd.env("ZNG_RECORD_TRACE", "")
72 .env("ZNG_RECORD_TRACE_DIR", &tmp)
73 .env("ZNG_RECORD_TRACE_FILTER", args.filter)
74 .env("ZNG_RECORD_TRACE_TIMESTAMP", &ts);
75
76 let mut cmd = match cmd.spawn() {
77 Ok(c) => c,
78 Err(e) => fatal!("cannot run, {e}"),
79 };
80
81 let code = match cmd.wait() {
82 Ok(s) => s.code().unwrap_or(0),
83 Err(e) => {
84 error!("cannot wait command exit, {e}");
85 101
86 }
87 };
88
89 if !out_dir.exists() {
90 fatal!("run did not save any trace\nnote: the feature \"trace_recorder\" must be enabled during build")
91 }
92
93 println!("merging trace files...");
94
95 out.write_all(b"[\n")
96 .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
97 let mut separator = "";
98
99 for trace in glob::glob(out_dir.join("*.json").display().to_string().as_str())
100 .ok()
101 .into_iter()
102 .flatten()
103 {
104 let trace = match trace {
105 Ok(t) => t,
106 Err(e) => {
107 error!("error globing trace files, {e}");
108 continue;
109 }
110 };
111 let json = match std::fs::read_to_string(&trace) {
112 Ok(s) => s,
113 Err(e) => {
114 error!("cannot read {}, {e}", trace.display());
115 continue;
116 }
117 };
118
119 let name_sys_pid = trace
120 .file_name()
121 .unwrap_or_default()
122 .to_string_lossy()
123 .strip_suffix(".json")
124 .unwrap_or_default()
125 .to_owned();
126 let name_sys_pid = match name_sys_pid.parse::<u64>() {
127 Ok(i) => i,
128 Err(_) => {
129 error!("expected only {{pid}}.json files");
130 continue;
131 }
132 };
133
134 let json = json.trim_start();
136 if !json.starts_with('[') {
137 error!("unknown format in {}", trace.display());
138 continue;
139 }
140 let json = &json[1..];
141
142 let mut thread_ids = HashSet::new();
143 {
145 let mut search = json;
146 let prefix = r#""tid":"#;
147 while let Some(i) = search.find(prefix) {
148 search = &search[i + prefix.len()..];
149 if let Some(i) = search.find(',') {
150 let tid = &search[..i];
151 if let Ok(tid) = tid.parse::<u64>() {
152 thread_ids.insert(tid);
153 }
154 search = &search[i + 1..];
155 }
156 }
157 }
158 let mut custom_thread_id = 9000u64;
159 let mut custom_threads = HashMap::new();
160
161 let mut reader = std::io::Cursor::new(json.as_bytes());
162 loop {
163 let mut pos = reader.position();
165 let mut buf = [0u8];
166 while reader.read(&mut buf).is_ok() {
167 if !b" \r\n\t,".contains(&buf[0]) {
168 break;
169 }
170 pos = reader.position();
171 }
172 reader.set_position(pos);
173 let mut de = serde_json::Deserializer::from_reader(&mut reader);
174 match serde_json::Value::deserialize(&mut de) {
175 Ok(mut entry) => {
176 if let Some(serde_json::Value::Number(pid)) = entry.get_mut("pid") {
178 if pid.as_u64() != Some(1) {
179 error!("expected only pid:1 in trace file");
180 continue;
181 }
182 *pid = serde_json::Number::from(name_sys_pid);
183 }
184
185 match &mut entry {
187 serde_json::Value::Object(entry) => {
188 enum Phase {
189 Event,
190 Begin,
191 End,
192 Other,
193 }
194 let mut ph = Phase::Other;
195 if let Some(serde_json::Value::String(p)) = entry.get("ph") {
196 match p.as_str() {
197 "i" => ph = Phase::Event,
198 "B" => ph = Phase::Begin,
199 "E" => ph = Phase::End,
200 _ => {}
201 }
202 }
203 if let Some(serde_json::Value::Object(args)) = entry.get_mut("args") {
204 match ph {
205 Phase::Event => {
206 if let Some(serde_json::Value::String(msg)) = args.get("message")
208 && let Some(rest) = msg.strip_prefix("pid: ")
209 && let Some((sys_pid, p_name)) = rest.split_once(", name: ")
210 && let Ok(sys_pid) = sys_pid.parse::<u64>()
211 && name_sys_pid == sys_pid
212 {
213 let args = format_args!(
214 r#"{separator}{{"ph":"M","pid":{sys_pid},"name":"process_name","args":{{"name":"{p_name}"}}}}"#,
215 );
216 out.write_fmt(args)
217 .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
218 } else if let Some(serde_json::Value::String(custom_thread)) = args.remove("thread")
219 && let Some(serde_json::Value::Number(n)) = entry.get_mut("tid")
220 {
221 let tid = match custom_threads.entry(custom_thread) {
222 std::collections::hash_map::Entry::Occupied(e) => *e.get(),
223 std::collections::hash_map::Entry::Vacant(e) => {
224 let name = serde_json::Value::String(e.key().clone());
225
226 while !thread_ids.insert(custom_thread_id) {
227 custom_thread_id = custom_thread_id.wrapping_add(1);
228 }
229 e.insert(custom_thread_id);
230
231 let args = format_args!(
232 r#"{separator}{{"ph":"M","pid":{name_sys_pid},"tid":{custom_thread_id},"name":"thread_name","args":{{"name":{name}}}}}"#,
233 );
234 out.write_fmt(args)
235 .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
236
237 custom_thread_id
238 }
239 };
240 *n = tid.into();
241 }
242 }
243 Phase::Begin | Phase::End => {
244 if let Some(serde_json::Value::String(custom_thread)) = args.remove("thread")
246 && let Some(serde_json::Value::Number(n)) = entry.get_mut("tid")
247 {
248 let tid = match custom_threads.entry(custom_thread.clone()) {
249 std::collections::hash_map::Entry::Occupied(e) => *e.get(),
250 std::collections::hash_map::Entry::Vacant(e) => {
251 let name = serde_json::Value::String(e.key().clone());
252
253 while !thread_ids.insert(custom_thread_id) {
254 custom_thread_id = custom_thread_id.wrapping_add(1);
255 }
256 e.insert(custom_thread_id);
257
258 let args = format_args!(
259 r#"{separator}{{"ph":"M","pid":{name_sys_pid},"tid":{custom_thread_id},"name":"thread_name","args":{{"name":{name}}}}}"#,
260 );
261 out.write_fmt(args)
262 .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
263
264 custom_thread_id
265 }
266 };
267 *n = tid.into();
268 }
269 }
270 Phase::Other => {}
271 }
272 }
273 }
274 _ => {
275 error!("unknown format in {}", trace.display());
276 }
277 }
278
279 out.write_all(separator.as_bytes())
280 .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
281 serde_json::to_writer(&mut out, &entry).unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
282 separator = ",\n";
283 }
284 Err(_) => break,
285 }
286 }
287 }
288
289 out.write_all(b"\n]")
290 .unwrap_or_else(|e| fatal!("cannot write {}, {e}", out_file.display()));
291 println!("saved to {}", out_file.display());
292
293 if code == 0 {
294 crate::util::exit();
295 } else {
296 std::process::exit(code);
298 }
299}