zng_task/http/
file_cache.rs

1use std::{
2    fs::{self, File, OpenOptions},
3    io::{self, Read, Write},
4    path::{Path, PathBuf},
5    pin::Pin,
6    time::SystemTime,
7};
8
9use crate::{
10    self as task,
11    http::util::{lock_exclusive, lock_shared, unlock_ok},
12};
13use zng_unit::TimeUnits;
14
15use super::*;
16
17type Fut<O> = Pin<Box<dyn Future<Output = O> + Send>>;
18
19/// A simple [`HttpCache`] implementation that uses a local directory.
20#[derive(Clone)]
21pub struct FileSystemCache {
22    dir: PathBuf,
23}
24impl FileSystemCache {
25    /// New from cache dir.
26    pub fn new(dir: PathBuf) -> io::Result<Self> {
27        std::fs::create_dir_all(&dir)?;
28
29        Ok(FileSystemCache { dir })
30    }
31
32    async fn entry(&self, key: CacheKey, write: bool) -> Option<CacheEntry> {
33        let dir = self.dir.clone();
34        let key = key.sha_str();
35        task::wait(move || CacheEntry::open(dir.join(key), write)).await
36    }
37
38    #[cfg(feature = "http_cookie")]
39    async fn cookie_jar(&self) -> cookie_store::CookieStore {
40        let s = match crate::fs::read_to_string(self.dir.join(Self::COOKIE_JAR)).await {
41            Ok(s) => s,
42            Err(e) => {
43                if !matches!(e.kind(), io::ErrorKind::NotFound) {
44                    tracing::error!("cannot read cookies store, {e}")
45                }
46                return cookie_store::CookieStore::default();
47            }
48        };
49        match serde_json::from_str(&s) {
50            Ok(j) => j,
51            Err(e) => {
52                tracing::error!("invalid cookie store format, {e}");
53                cookie_store::CookieStore::default()
54            }
55        }
56    }
57
58    #[cfg(feature = "http_cookie")]
59    async fn set_cookie_jar(&self, jar: cookie_store::CookieStore) {
60        let s = serde_json::to_string(&jar).unwrap();
61        if let Err(e) = crate::fs::write(self.dir.join(Self::COOKIE_JAR), s.as_bytes()).await {
62            tracing::error!("cannot write cookie store, {e}")
63        }
64    }
65
66    #[cfg(feature = "http_cookie")]
67    const COOKIE_JAR: &str = "cookie_jar.json";
68}
69impl HttpCache for FileSystemCache {
70    fn policy(&'static self, key: CacheKey) -> Fut<Option<CachePolicy>> {
71        Box::pin(async { self.entry(key, false).await.map(|mut e| e.policy.take().unwrap()) })
72    }
73    fn set_policy(&'static self, key: CacheKey, policy: CachePolicy) -> Fut<bool> {
74        Box::pin(async {
75            if let Some(entry) = self.entry(key, true).await {
76                task::wait(move || entry.write_policy(policy)).await
77            } else {
78                false
79            }
80        })
81    }
82
83    fn body(&'static self, key: CacheKey) -> Fut<Option<IpcBytes>> {
84        Box::pin(async { self.entry(key, false).await?.open_body().await })
85    }
86    fn set(&'static self, key: CacheKey, policy: CachePolicy, body: IpcBytes) -> Fut<()> {
87        Box::pin(async {
88            if let Some(entry) = self.entry(key, true).await {
89                let (entry, ok) = task::wait(move || {
90                    let ok = entry.write_policy(policy);
91                    (entry, ok)
92                })
93                .await;
94                if ok {
95                    entry.write_body(body).await;
96                }
97            }
98        })
99    }
100
101    fn remove(&'static self, key: CacheKey) -> Fut<()> {
102        Box::pin(async {
103            if let Some(entry) = self.entry(key, true).await {
104                task::wait(move || {
105                    CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
106                })
107                .await
108            }
109        })
110    }
111
112    #[cfg(feature = "http_cookie")]
113    fn cookie(&'static self, uri: Uri) -> Fut<Option<http::HeaderValue>> {
114        Box::pin(async move {
115            let jar = self.cookie_jar().await;
116            let mut r = String::new();
117            let mut sep = "";
118            for (key, value) in jar.get_request_values(&uri.to_string().parse().unwrap()) {
119                r.push_str(sep);
120                r.push_str(key);
121                r.push('=');
122                r.push_str(value);
123                sep = "; ";
124            }
125            if r.is_empty() {
126                None
127            } else {
128                match http::HeaderValue::from_str(&r) {
129                    Ok(c) => Some(c),
130                    Err(e) => {
131                        tracing::error!("invalid cookie storage, {e}");
132                        self.set_cookie_jar(jar).await;
133                        None
134                    }
135                }
136            }
137        })
138    }
139    #[cfg(feature = "http_cookie")]
140    fn set_cookie(&'static self, uri: Uri, cookie: http::HeaderValue) -> Fut<()> {
141        Box::pin(async move {
142            let cookie = match cookie.to_str() {
143                Ok(s) => s.to_owned(),
144                Err(e) => {
145                    tracing::error!("cannot store invalid cookie value, {e}");
146                    return;
147                }
148            };
149            let url = uri.to_string().parse().unwrap();
150            let cookies = cookie
151                .split(';')
152                .flat_map(|c| cookie_store::Cookie::parse(c.trim(), &url).ok())
153                .map(|c| (*c).clone().into_owned());
154            let mut jar = self.cookie_jar().await;
155            jar.store_response_cookies(cookies, &url);
156            self.set_cookie_jar(jar).await;
157        })
158    }
159    #[cfg(feature = "http_cookie")]
160    fn remove_cookie(&'static self, uri: Uri) -> Fut<()> {
161        Box::pin(async move {
162            let mut jar = self.cookie_jar().await;
163            let matches = jar.matches(&uri.to_string().parse().unwrap());
164            if matches.is_empty() {
165                return;
166            }
167            let matches: Vec<_> = matches.into_iter().cloned().collect();
168            for c in matches {
169                jar.remove(c.domain().unwrap_or(""), c.path().unwrap_or(""), c.name());
170            }
171            self.set_cookie_jar(jar).await;
172        })
173    }
174
175    fn purge(&'static self) -> Fut<()> {
176        Box::pin(async {
177            let dir = self.dir.clone();
178            task::wait(move || {
179                if let Ok(entries) = std::fs::read_dir(dir) {
180                    for entry in entries.flatten() {
181                        let entry = entry.path();
182                        if entry.is_dir()
183                            && let Ok(lock) = File::open(entry.join(CacheEntry::LOCK))
184                            && lock.try_lock_shared().is_ok()
185                        {
186                            CacheEntry::try_delete_locked_dir(&entry, &lock);
187                        }
188                    }
189                }
190            })
191            .await
192        })
193    }
194
195    fn prune(&'static self) -> Fut<()> {
196        Box::pin(async {
197            let dir = self.dir.clone();
198            task::wait(move || {
199                if let Ok(entries) = std::fs::read_dir(dir) {
200                    let now = SystemTime::now();
201                    let old = (24 * 3).hours();
202
203                    for entry in entries.flatten() {
204                        let entry = entry.path();
205                        if entry.is_dir()
206                            && let Some(entry) = CacheEntry::open(entry, false)
207                        {
208                            let policy = entry.policy.as_ref().unwrap();
209                            if policy.is_stale(now) && policy.age(now) > old {
210                                CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
211                            }
212                        }
213                    }
214                }
215            })
216            .await
217        })
218    }
219}
220
221struct CacheEntry {
222    dir: PathBuf,
223    lock: File,
224
225    policy: Option<CachePolicy>,
226}
227impl CacheEntry {
228    const LOCK: &'static str = ".lock";
229    const WRITING: &'static str = ".w";
230    const POLICY: &'static str = ".policy";
231    const BODY: &'static str = ".body";
232
233    /// Open or create an entry.
234    fn open(dir: PathBuf, write: bool) -> Option<Self> {
235        if write
236            && !dir.exists()
237            && let Err(e) = fs::create_dir_all(&dir)
238        {
239            tracing::error!("cache dir error, {e:?}");
240            return None;
241        }
242
243        let lock = dir.join(Self::LOCK);
244        let mut opt = OpenOptions::new();
245        if write {
246            opt.read(true).write(true).create(true);
247        } else {
248            opt.read(true);
249        }
250
251        let mut lock = match opt.open(lock) {
252            Ok(l) => l,
253            Err(e) if e.kind() == std::io::ErrorKind::NotFound && !dir.exists() => return None,
254            Err(e) => {
255                tracing::error!("cache lock open error, {e:?}");
256                Self::try_delete_dir(&dir);
257                return None;
258            }
259        };
260
261        const TIMEOUT: Duration = Duration::from_secs(10);
262
263        let lock_r = if write {
264            lock_exclusive(&lock, TIMEOUT)
265        } else {
266            lock_shared(&lock, TIMEOUT)
267        };
268        if let Err(e) = lock_r {
269            tracing::error!("cache lock error, {e:?}");
270            Self::try_delete_dir(&dir);
271            return None;
272        }
273
274        let mut version = String::new();
275        if let Err(e) = lock.read_to_string(&mut version) {
276            tracing::error!("cache lock read error, {e:?}");
277            Self::try_delete_locked_dir(&dir, &lock);
278            return None;
279        }
280
281        let expected_version = "zng::http::FileCache 1.0";
282        if version != expected_version {
283            if write && version.is_empty() {
284                if let Err(e) = lock.set_len(0).and_then(|()| lock.write_all(expected_version.as_bytes())) {
285                    tracing::error!("cache lock write error, {e:?}");
286                    Self::try_delete_locked_dir(&dir, &lock);
287                    return None;
288                }
289            } else {
290                tracing::error!("unknown cache version, {version:?}");
291                Self::try_delete_locked_dir(&dir, &lock);
292                return None;
293            }
294        }
295
296        let policy_file = dir.join(Self::POLICY);
297
298        if dir.join(Self::WRITING).exists() {
299            tracing::error!("cache has partial files, removing");
300
301            if write {
302                if let Err(e) = Self::remove_files(&dir) {
303                    tracing::error!("failed to clear partial files, {e:?}");
304                    Self::try_delete_locked_dir(&dir, &lock);
305                    return None;
306                }
307            } else {
308                Self::try_delete_locked_dir(&dir, &lock);
309                return None;
310            }
311        }
312
313        if policy_file.exists() {
314            let policy = match Self::read_policy(&policy_file) {
315                Ok(i) => i,
316                Err(e) => {
317                    tracing::error!("cache policy read error, {e:?}");
318                    Self::try_delete_locked_dir(&dir, &lock);
319                    return None;
320                }
321            };
322
323            Some(Self {
324                dir,
325                lock,
326                policy: Some(policy),
327            })
328        } else if write {
329            Some(Self { dir, lock, policy: None })
330        } else {
331            tracing::error!("cache policy missing");
332            Self::try_delete_locked_dir(&dir, &lock);
333            None
334        }
335    }
336    fn read_policy(file: &Path) -> Result<CachePolicy, Box<dyn std::error::Error>> {
337        let file = std::fs::File::open(file)?;
338        let file = std::io::BufReader::new(file);
339        let policy = serde_json::from_reader(file)?;
340        Ok(policy)
341    }
342
343    /// Replace the .policy content, returns `true` if the entry still exists.
344    pub fn write_policy(&self, policy: CachePolicy) -> bool {
345        let w_tag = if let Some(t) = self.writing_tag() {
346            t
347        } else {
348            return false;
349        };
350
351        if let Err(e) = self.write_policy_impl(policy) {
352            tracing::error!("cache policy serialize error, {e:?}");
353            Self::try_delete_locked_dir(&self.dir, &self.lock);
354            return false;
355        }
356
357        let _ = fs::remove_file(w_tag);
358
359        true
360    }
361    fn write_policy_impl(&self, policy: CachePolicy) -> Result<(), Box<dyn std::error::Error>> {
362        let file = std::fs::File::create(self.dir.join(Self::POLICY))?;
363        serde_json::to_writer(file, &policy)?;
364        Ok(())
365    }
366
367    /// Start reading the body content, returns `Some(_)` if the entry still exists.
368    pub async fn open_body(&self) -> Option<IpcBytes> {
369        let path = self.dir.join(Self::BODY);
370        match task::wait(move || IpcBytes::from_file_blocking(&path)).await {
371            Ok(b) => Some(b),
372            Err(e) => {
373                tracing::error!("cache open body error, {e:?}");
374                Self::try_delete_locked_dir(&self.dir, &self.lock);
375                None
376            }
377        }
378    }
379
380    /// Start downloading and writing a copy of the body to the cache entry.
381    pub async fn write_body(self, body: IpcBytes) {
382        let w_tag = if let Some(t) = self.writing_tag() {
383            t
384        } else {
385            return;
386        };
387
388        if let Err(e) = task::fs::write(self.dir.join(Self::BODY), body).await {
389            tracing::error!("cache body create error, {e:?}");
390            Self::try_delete_locked_dir(&self.dir, &self.lock);
391        } else {
392            let _ = fs::remove_file(w_tag);
393        }
394    }
395
396    fn try_delete_locked_dir(dir: &Path, lock: &File) {
397        let _ = unlock_ok(lock);
398        Self::try_delete_dir(dir);
399    }
400
401    fn try_delete_dir(dir: &Path) {
402        let _ = remove_dir_all::remove_dir_all(dir);
403    }
404
405    fn writing_tag(&self) -> Option<PathBuf> {
406        let tag = self.dir.join(Self::WRITING);
407
408        if let Err(e) = fs::write(&tag, "w") {
409            tracing::error!("cache write tag error, {e:?}");
410            Self::try_delete_locked_dir(&self.dir, &self.lock);
411            None
412        } else {
413            Some(tag)
414        }
415    }
416
417    fn remove_files(dir: &Path) -> std::io::Result<()> {
418        for file in [Self::BODY, Self::POLICY, Self::WRITING] {
419            fs::remove_file(dir.join(file))?
420        }
421        Ok(())
422    }
423}
424impl Drop for CacheEntry {
425    fn drop(&mut self) {
426        if let Err(e) = unlock_ok(&self.lock) {
427            tracing::error!("cache unlock error, {e:?}");
428            Self::try_delete_dir(&self.dir);
429        }
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use std::time::SystemTime;
436
437    use zng_clone_move::async_clmv;
438
439    use crate::{
440        self as task,
441        http::{file_cache::FileSystemCache, header::*, util::*, *},
442    };
443    use zng_unit::*;
444
445    macro_rules! test_cache {
446        ($test:tt, $tmp:tt, $tmp_file:tt) => {
447            test_log();
448            let $tmp = TestTempDir::new($tmp_file);
449            let $test: &'static FileSystemCache = Box::leak(Box::new(FileSystemCache::new($tmp.path().to_owned()).unwrap()));
450        };
451        ($test:tt, $tmp:tt) => {
452            test_cache!($test, tmp, $tmp)
453        };
454    }
455
456    #[test]
457    pub fn file_cache_miss() {
458        test_cache!(test, "file_cache_miss");
459
460        let request = Request::get("https://file_cache_miss.invalid/content").unwrap();
461        let key = CacheKey::from_request(&request);
462
463        let r = async_test(async move { test.policy(key).await });
464
465        assert!(r.is_none());
466    }
467
468    #[test]
469    pub fn file_cache_set_get() {
470        test_cache!(test, "file_cache_set");
471
472        let request = Request::get("https://file_cache_set.invalid/content").unwrap();
473        let key = CacheKey::from_request(&request);
474
475        let mut response = Response::from_msg(StatusCode::OK, "test content.");
476
477        let policy = CachePolicy::new(&request, &response);
478
479        let body = async_test(async move {
480            test.set(key.clone(), policy, response.body().await.unwrap()).await;
481            let body = test.body(key).await.unwrap();
482            Response::from_done(StatusCode::OK, HeaderMap::new(), Uri::from_static("/"), Metrics::zero(), body)
483                .body_text()
484                .await
485                .unwrap()
486        });
487
488        assert_eq!(body, "test content.");
489    }
490
491    #[test]
492    pub fn file_cache_get_cached() {
493        test_cache!(test, tmp, "file_cache_get_cached");
494
495        let request = Request::get("https://file_cache_get_cached.invalid/content").unwrap();
496        let key = CacheKey::from_request(&request);
497
498        let mut response = Response::from_msg(StatusCode::OK, "test content.");
499
500        let policy = CachePolicy::new(&request, &response);
501
502        async_test(async_clmv!(key, {
503            test.set(key.clone(), policy, response.body().await.unwrap()).await;
504        }));
505
506        let test: &'static FileSystemCache = Box::leak(Box::new(FileSystemCache::new(tmp.path().to_owned()).unwrap()));
507
508        let body = async_test(async move {
509            let body = test.body(key).await.unwrap();
510            Response::from_done(StatusCode::OK, HeaderMap::new(), Uri::from_static("/"), Metrics::zero(), body)
511                .body_text()
512                .await
513                .unwrap()
514        });
515
516        assert_eq!(body, "test content.");
517    }
518
519    #[test]
520    pub fn file_cache_get_policy() {
521        test_cache!(test, tmp, "get_etag");
522
523        let request = Request::get("https://get_etag.invalid/content").unwrap();
524        let key = CacheKey::from_request(&request);
525
526        let mut response = Response::from_msg(StatusCode::OK, "test content.");
527        let policy = CachePolicy::new(&request, &response);
528
529        let r_policy = async_test(async_clmv!(policy, {
530            test.set(key.clone(), policy, response.body().await.unwrap()).await;
531
532            let test: &'static FileSystemCache = Box::leak(Box::new(FileSystemCache::new(tmp.path().to_owned()).unwrap()));
533
534            test.policy(key).await.unwrap()
535        }));
536
537        let now = SystemTime::now();
538        assert_eq!(policy.age(now), r_policy.age(now));
539    }
540
541    #[track_caller]
542    fn async_test<F>(test: F) -> F::Output
543    where
544        F: Future,
545    {
546        task::block_on(task::with_deadline(test, 30.secs())).unwrap()
547    }
548}