zng_task/http/
util.rs
1use std::time::Duration;
2
3pub fn lock_exclusive(file: &impl fs4::fs_std::FileExt, timeout: Duration) -> std::io::Result<()> {
5 lock_timeout(file, |f| f.try_lock_exclusive(), timeout)
6}
7
8pub fn lock_shared(file: &impl fs4::fs_std::FileExt, timeout: Duration) -> std::io::Result<()> {
10 lock_timeout(file, |f| f.try_lock_shared(), timeout)
11}
12
13fn lock_timeout<F: fs4::fs_std::FileExt>(
14 file: &F,
15 try_lock: impl Fn(&F) -> std::io::Result<()>,
16 mut timeout: Duration,
17) -> std::io::Result<()> {
18 let mut locked_error = None;
19 loop {
20 match try_lock(file) {
21 Ok(()) => return Ok(()),
22 Err(e) => {
23 if e.kind() != std::io::ErrorKind::WouldBlock
24 && e.raw_os_error() != locked_error.get_or_insert_with(fs4::lock_contended_error).raw_os_error()
25 {
26 return Err(e);
27 }
28
29 const INTERVAL: Duration = Duration::from_millis(10);
30 timeout = timeout.saturating_sub(INTERVAL);
31 if timeout.is_zero() {
32 return Err(std::io::Error::new(std::io::ErrorKind::TimedOut, e));
33 } else {
34 std::thread::sleep(INTERVAL.min(timeout));
35 }
36 }
37 }
38 }
39}
40
41pub fn unlock_ok(file: &impl fs4::fs_std::FileExt) -> std::io::Result<()> {
43 if let Err(e) = file.unlock() {
44 if let Some(code) = e.raw_os_error() {
45 #[cfg(windows)]
46 if code == 158 {
47 return Ok(());
49 }
50
51 #[cfg(unix)]
52 if code == 22 {
53 return Ok(());
55 }
56 }
57
58 Err(e)
59 } else {
60 Ok(())
61 }
62}
63
64#[cfg(test)]
68pub fn test_log() {
69 use std::sync::atomic::*;
70
71 use std::fmt;
72 use tracing::*;
73
74 struct TestSubscriber;
75 impl Subscriber for TestSubscriber {
76 fn enabled(&self, metadata: &Metadata<'_>) -> bool {
77 metadata.is_event() && metadata.level() < &Level::WARN
78 }
79
80 fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
81 unimplemented!()
82 }
83
84 fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {
85 unimplemented!()
86 }
87
88 fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {
89 unimplemented!()
90 }
91
92 fn event(&self, event: &Event<'_>) {
93 struct MsgCollector<'a>(&'a mut String);
94 impl field::Visit for MsgCollector<'_> {
95 fn record_debug(&mut self, field: &field::Field, value: &dyn fmt::Debug) {
96 use std::fmt::Write;
97 write!(self.0, "\n {} = {:?}", field.name(), value).unwrap();
98 }
99 }
100
101 let meta = event.metadata();
102 let file = meta.file().unwrap_or("");
103 let line = meta.line().unwrap_or(0);
104
105 let mut msg = format!("[{file}:{line}]");
106 event.record(&mut MsgCollector(&mut msg));
107
108 if meta.level() == &Level::ERROR {
109 panic!("[LOG-ERROR]{msg}");
110 } else {
111 eprintln!("[LOG-WARN]{msg}");
112 }
113 }
114
115 fn enter(&self, _span: &span::Id) {
116 unimplemented!()
117 }
118 fn exit(&self, _span: &span::Id) {
119 unimplemented!()
120 }
121 }
122
123 static IS_SET: AtomicBool = AtomicBool::new(false);
124
125 if !IS_SET.swap(true, Ordering::Relaxed) {
126 if let Err(e) = subscriber::set_global_default(TestSubscriber) {
127 panic!("failed to set test log subscriber, {e:?}");
128 }
129 }
130}
131
132#[cfg(test)]
138pub struct TestTempDir {
139 path: Option<std::path::PathBuf>,
140}
141#[cfg(test)]
142impl Drop for TestTempDir {
143 fn drop(&mut self) {
144 if let Some(path) = self.path.take() {
145 let _ = remove_dir_all::remove_dir_all(path);
146 }
147 }
148}
149#[cfg(test)]
150impl TestTempDir {
151 pub fn new(name: &str) -> Self {
153 let path = Self::try_target().unwrap_or_else(Self::fallback).join(name);
154 std::fs::create_dir_all(&path).unwrap_or_else(|e| panic!("failed to create temp `{}`, {e:?}", path.display()));
155 TestTempDir { path: Some(path) }
156 }
157 fn try_target() -> Option<std::path::PathBuf> {
158 let p = dunce::canonicalize(std::env::current_exe().ok()?).ok()?;
159 let target = p.parent()?.parent()?.parent()?;
161 if target.file_name()?.to_str()? != "target" {
162 return None;
163 }
164 Some(target.join("tmp/unit_tests"))
165 }
166 fn fallback() -> std::path::PathBuf {
167 tracing::warn!("using fallback temporary directory");
168 std::env::temp_dir().join("zng/unit_tests")
169 }
170
171 pub fn path(&self) -> &std::path::Path {
173 self.path.as_deref().unwrap()
174 }
175
176 pub fn keep(mut self) -> std::path::PathBuf {
180 self.path.take().unwrap()
181 }
182}
183#[cfg(test)]
184impl std::ops::Deref for TestTempDir {
185 type Target = std::path::Path;
186
187 fn deref(&self) -> &Self::Target {
188 self.path()
189 }
190}
191#[cfg(test)]
192impl std::convert::AsRef<std::path::Path> for TestTempDir {
193 fn as_ref(&self) -> &std::path::Path {
194 self.path()
195 }
196}
197#[cfg(test)]
198impl<'a> From<&'a TestTempDir> for std::path::PathBuf {
199 fn from(a: &'a TestTempDir) -> Self {
200 a.path.as_ref().unwrap().clone()
201 }
202}