zng_task/http/
util.rs

1use std::time::Duration;
2
3/// Calls [`fs4::FileExt::lock_exclusive`] with a timeout.
4pub fn lock_exclusive(file: &impl fs4::fs_std::FileExt, timeout: Duration) -> std::io::Result<()> {
5    lock_timeout(file, |f| f.try_lock_exclusive(), timeout)
6}
7
8/// Calls [`fs4::FileExt::lock_shared`] with a timeout.
9pub fn lock_shared(file: &impl fs4::fs_std::FileExt, timeout: Duration) -> std::io::Result<()> {
10    lock_timeout(file, |f| f.try_lock_shared(), timeout)
11}
12
13fn lock_timeout<F: fs4::fs_std::FileExt>(
14    file: &F,
15    try_lock: impl Fn(&F) -> std::io::Result<()>,
16    mut timeout: Duration,
17) -> std::io::Result<()> {
18    let mut locked_error = None;
19    loop {
20        match try_lock(file) {
21            Ok(()) => return Ok(()),
22            Err(e) => {
23                if e.kind() != std::io::ErrorKind::WouldBlock
24                    && e.raw_os_error() != locked_error.get_or_insert_with(fs4::lock_contended_error).raw_os_error()
25                {
26                    return Err(e);
27                }
28
29                const INTERVAL: Duration = Duration::from_millis(10);
30                timeout = timeout.saturating_sub(INTERVAL);
31                if timeout.is_zero() {
32                    return Err(std::io::Error::new(std::io::ErrorKind::TimedOut, e));
33                } else {
34                    std::thread::sleep(INTERVAL.min(timeout));
35                }
36            }
37        }
38    }
39}
40
41/// Calls [`fs4::FileExt::unlock`] and ignores "already unlocked" errors.
42pub fn unlock_ok(file: &impl fs4::fs_std::FileExt) -> std::io::Result<()> {
43    if let Err(e) = file.unlock() {
44        if let Some(code) = e.raw_os_error() {
45            #[cfg(windows)]
46            if code == 158 {
47                // ERROR_NOT_LOCKED
48                return Ok(());
49            }
50
51            #[cfg(unix)]
52            if code == 22 {
53                // EINVAL
54                return Ok(());
55            }
56        }
57
58        Err(e)
59    } else {
60        Ok(())
61    }
62}
63
64/// Sets a `tracing` subscriber that writes warnings to stderr and panics on errors.
65///
66/// Panics if another different subscriber is already set.
67#[cfg(test)]
68pub fn test_log() {
69    use std::sync::atomic::*;
70
71    use std::fmt;
72    use tracing::*;
73
74    struct TestSubscriber;
75    impl Subscriber for TestSubscriber {
76        fn enabled(&self, metadata: &Metadata<'_>) -> bool {
77            metadata.is_event() && metadata.level() < &Level::WARN
78        }
79
80        fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
81            unimplemented!()
82        }
83
84        fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {
85            unimplemented!()
86        }
87
88        fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {
89            unimplemented!()
90        }
91
92        fn event(&self, event: &Event<'_>) {
93            struct MsgCollector<'a>(&'a mut String);
94            impl field::Visit for MsgCollector<'_> {
95                fn record_debug(&mut self, field: &field::Field, value: &dyn fmt::Debug) {
96                    use std::fmt::Write;
97                    write!(self.0, "\n  {} = {:?}", field.name(), value).unwrap();
98                }
99            }
100
101            let meta = event.metadata();
102            let file = meta.file().unwrap_or("");
103            let line = meta.line().unwrap_or(0);
104
105            let mut msg = format!("[{file}:{line}]");
106            event.record(&mut MsgCollector(&mut msg));
107
108            if meta.level() == &Level::ERROR {
109                panic!("[LOG-ERROR]{msg}");
110            } else {
111                eprintln!("[LOG-WARN]{msg}");
112            }
113        }
114
115        fn enter(&self, _span: &span::Id) {
116            unimplemented!()
117        }
118        fn exit(&self, _span: &span::Id) {
119            unimplemented!()
120        }
121    }
122
123    static IS_SET: AtomicBool = AtomicBool::new(false);
124
125    if !IS_SET.swap(true, Ordering::Relaxed) {
126        if let Err(e) = subscriber::set_global_default(TestSubscriber) {
127            panic!("failed to set test log subscriber, {e:?}");
128        }
129    }
130}
131
132/// A temporary directory for unit tests.
133///
134/// Directory is "target/tmp/unit_tests/<name>" with fallback to system temporary if the target folder is not found.
135///
136/// Auto cleanup on drop.
137#[cfg(test)]
138pub struct TestTempDir {
139    path: Option<std::path::PathBuf>,
140}
141#[cfg(test)]
142impl Drop for TestTempDir {
143    fn drop(&mut self) {
144        if let Some(path) = self.path.take() {
145            let _ = remove_dir_all::remove_dir_all(path);
146        }
147    }
148}
149#[cfg(test)]
150impl TestTempDir {
151    /// Create temporary directory for the unique test name.
152    pub fn new(name: &str) -> Self {
153        let path = Self::try_target().unwrap_or_else(Self::fallback).join(name);
154        std::fs::create_dir_all(&path).unwrap_or_else(|e| panic!("failed to create temp `{}`, {e:?}", path.display()));
155        TestTempDir { path: Some(path) }
156    }
157    fn try_target() -> Option<std::path::PathBuf> {
158        let p = dunce::canonicalize(std::env::current_exe().ok()?).ok()?;
159        // target/debug/deps/../../..
160        let target = p.parent()?.parent()?.parent()?;
161        if target.file_name()?.to_str()? != "target" {
162            return None;
163        }
164        Some(target.join("tmp/unit_tests"))
165    }
166    fn fallback() -> std::path::PathBuf {
167        tracing::warn!("using fallback temporary directory");
168        std::env::temp_dir().join("zng/unit_tests")
169    }
170
171    /// Dereferences the temporary directory path.
172    pub fn path(&self) -> &std::path::Path {
173        self.path.as_deref().unwrap()
174    }
175
176    /// Drop `self` without removing the temporary files.
177    ///
178    /// Returns the path to the temporary directory.
179    pub fn keep(mut self) -> std::path::PathBuf {
180        self.path.take().unwrap()
181    }
182}
183#[cfg(test)]
184impl std::ops::Deref for TestTempDir {
185    type Target = std::path::Path;
186
187    fn deref(&self) -> &Self::Target {
188        self.path()
189    }
190}
191#[cfg(test)]
192impl std::convert::AsRef<std::path::Path> for TestTempDir {
193    fn as_ref(&self) -> &std::path::Path {
194        self.path()
195    }
196}
197#[cfg(test)]
198impl<'a> From<&'a TestTempDir> for std::path::PathBuf {
199    fn from(a: &'a TestTempDir) -> Self {
200        a.path.as_ref().unwrap().clone()
201    }
202}