zng_task/http/
cache.rs

1use std::{
2    fmt,
3    time::{Duration, SystemTime},
4};
5
6use super::{Body, Error};
7use async_trait::async_trait;
8use serde::*;
9use zng_unit::*;
10
11use http_cache_semantics as hcs;
12
13pub(super) use hcs::BeforeRequest;
14
15/// Represents a serializable configuration for a cache entry in a [`CacheDb`].
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CachePolicy(PolicyInner);
18impl CachePolicy {
19    pub(super) fn new(request: &isahc::Request<super::Body>, response: &isahc::Response<isahc::AsyncBody>) -> Self {
20        let p = hcs::CachePolicy::new_options(
21            request,
22            response,
23            SystemTime::now(),
24            hcs::CacheOptions {
25                shared: false,
26                ignore_cargo_cult: true,
27                ..Default::default()
28            },
29        );
30        Self(PolicyInner::Policy(p))
31    }
32
33    pub(super) fn should_store(&self) -> bool {
34        match &self.0 {
35            PolicyInner::Policy(p) => p.is_storable() && p.time_to_live(SystemTime::now()) > 5.secs(),
36            PolicyInner::Permanent(_) => true,
37        }
38    }
39
40    pub(super) fn new_permanent(response: &isahc::Response<isahc::AsyncBody>) -> Self {
41        let p = PermanentHeader {
42            res: response.headers().clone(),
43            status: response.status(),
44        };
45        Self(PolicyInner::Permanent(p))
46    }
47
48    pub(super) fn is_permanent(&self) -> bool {
49        matches!(self.0, PolicyInner::Permanent(_))
50    }
51
52    pub(super) fn before_request(&self, request: &isahc::Request<super::Body>) -> BeforeRequest {
53        match &self.0 {
54            PolicyInner::Policy(p) => p.before_request(request, SystemTime::now()),
55            PolicyInner::Permanent(p) => BeforeRequest::Fresh(p.parts()),
56        }
57    }
58
59    pub(super) fn after_response(
60        &self,
61        request: &isahc::Request<super::Body>,
62        response: &isahc::Response<isahc::AsyncBody>,
63    ) -> AfterResponse {
64        match &self.0 {
65            PolicyInner::Policy(p) => p.after_response(request, response, SystemTime::now()).into(),
66            PolicyInner::Permanent(_) => unreachable!(), // don't call `after_response` for `Fresh` `before_request`
67        }
68    }
69
70    /// Returns how long the response has been sitting in cache.
71    pub fn age(&self, now: SystemTime) -> Duration {
72        match &self.0 {
73            PolicyInner::Policy(p) => p.age(now),
74            PolicyInner::Permanent(_) => Duration::MAX,
75        }
76    }
77
78    /// Returns approximate time in milliseconds until the response becomes stale.
79    pub fn time_to_live(&self, now: SystemTime) -> Duration {
80        match &self.0 {
81            PolicyInner::Policy(p) => p.time_to_live(now),
82            PolicyInner::Permanent(_) => Duration::MAX,
83        }
84    }
85
86    /// Returns `true` if the cache entry has expired.
87    pub fn is_stale(&self, now: SystemTime) -> bool {
88        match &self.0 {
89            PolicyInner::Policy(p) => p.is_stale(now),
90            PolicyInner::Permanent(_) => false,
91        }
92    }
93}
94impl From<hcs::CachePolicy> for CachePolicy {
95    fn from(p: hcs::CachePolicy) -> Self {
96        CachePolicy(PolicyInner::Policy(p))
97    }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101enum PolicyInner {
102    Policy(hcs::CachePolicy),
103    Permanent(PermanentHeader),
104}
105#[derive(Debug, Clone, Serialize, Deserialize)]
106struct PermanentHeader {
107    #[serde(with = "http_serde::header_map")]
108    res: super::header::HeaderMap,
109    #[serde(with = "http_serde::status_code")]
110    status: super::StatusCode,
111}
112impl PermanentHeader {
113    pub fn parts(&self) -> isahc::http::response::Parts {
114        let mut r = isahc::Response::<()>::default().into_parts().0;
115        r.headers = self.res.clone();
116        r.status = self.status;
117        r
118    }
119}
120
121/// New policy and flags to act on `after_response()`
122pub(super) enum AfterResponse {
123    /// You can use the cached body! Make sure to use these updated headers
124    NotModified(CachePolicy, isahc::http::response::Parts),
125    /// You need to update the body in the cache
126    Modified(CachePolicy, isahc::http::response::Parts),
127}
128impl From<hcs::AfterResponse> for AfterResponse {
129    fn from(s: hcs::AfterResponse) -> Self {
130        match s {
131            hcs::AfterResponse::NotModified(po, pa) => AfterResponse::NotModified(po.into(), pa),
132            hcs::AfterResponse::Modified(po, pa) => AfterResponse::Modified(po.into(), pa),
133        }
134    }
135}
136
137/// Represents a download cache in a [`Client`].
138///
139/// Cache implementers must store a [`CachePolicy`] and [`Body`] for a given [`CacheKey`].
140///
141/// [`Client`]: crate::http::Client
142#[async_trait]
143pub trait CacheDb: Send + Sync + 'static {
144    /// Dynamic clone.
145    fn clone_boxed(&self) -> Box<dyn CacheDb>;
146
147    /// Retrieves the cache-policy for the given `key`.
148    async fn policy(&self, key: &CacheKey) -> Option<CachePolicy>;
149
150    /// Replaces the cache-policy for the given `key`.
151    ///
152    /// Returns `false` if the entry does not exist.
153    async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool;
154
155    /// Read/clone the cached body for the given `key`.
156    async fn body(&self, key: &CacheKey) -> Option<Body>;
157
158    /// Caches the `policy` and `body` for the given `key`.
159    ///
160    /// The `body` is fully downloaded and stored into the cache, this method can await for the full download
161    /// before returning or return immediately with a body that updates as data is cached.
162    ///
163    /// In case of error the cache entry is removed, the returned body may continue downloading data if possible.
164    /// In case of a cache entry creation error the input `body` may be returned if it was not lost in the error.
165    async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body>;
166
167    /// Remove cached policy and body for the given `key`.
168    async fn remove(&self, key: &CacheKey);
169
170    /// Remove all cached entries that are not locked in a `set*` operation.
171    async fn purge(&self);
172
173    /// Remove cache entries to reduce pressure.
174    ///
175    /// What entries are removed depends on the cache DB implementer.
176    async fn prune(&self);
177}
178
179/// Cache mode selected for a [`Uri`].
180///
181/// See [`ClientBuilder::cache_mode`] for more information.
182///
183/// [`Uri`]: crate::http::Uri
184///
185/// [`ClientBuilder::cache_mode`]: crate::http::ClientBuilder::cache_mode
186#[derive(Debug, Clone, Default)]
187pub enum CacheMode {
188    /// Always requests the server, never caches the response.
189    NoCache,
190
191    /// Follow the standard cache policy as computed by [`http-cache-semantics`].
192    ///
193    /// [`http-cache-semantics`]: https://docs.rs/http-cache-semantics
194    #[default]
195    Default,
196
197    /// Always caches the response, overwriting cache control configs.
198    ///
199    /// If the response is cached returns it instead of requesting an update.
200    Permanent,
201
202    /// Returns the error.
203    Error(Error),
204}
205
206/// Represents a SHA-512/256 hash computed from a normalized request.
207#[derive(Debug, Clone, PartialEq, Eq, Hash)]
208pub struct CacheKey([u8; 32]);
209impl CacheKey {
210    /// Compute key from request.
211    pub fn from_request(request: &super::Request) -> Self {
212        Self::new(&request.req)
213    }
214
215    pub(super) fn new(request: &isahc::Request<super::Body>) -> Self {
216        let mut headers: Vec<_> = request.headers().iter().map(|(n, v)| (n.clone(), v.clone())).collect();
217
218        headers.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
219
220        use sha2::Digest;
221
222        let mut m = sha2::Sha512_256::new();
223        m.update(request.uri().to_string().as_bytes());
224        m.update(request.method().as_str());
225        for (name, value) in headers {
226            m.update(name.as_str().as_bytes());
227            m.update(value.as_bytes());
228        }
229        let hash = m.finalize();
230
231        CacheKey(hash.into())
232    }
233
234    /// Returns the SHA-512/256 hash.
235    pub fn sha(&self) -> [u8; 32] {
236        self.0
237    }
238
239    /// Computes a URI safe base64 encoded SHA-512/256 from the key data.
240    pub fn sha_str(&self) -> String {
241        use base64::*;
242
243        let hash = self.sha();
244        base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&hash[..])
245    }
246}
247impl fmt::Display for CacheKey {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        write!(f, "{}", self.sha_str())
250    }
251}
252
253pub use file_cache::FileSystemCache;
254
255mod file_cache {
256    use std::{
257        fs::{self, File, OpenOptions},
258        io::{self, Read, Write},
259        path::{Path, PathBuf},
260    };
261
262    use crate::http::util::{lock_exclusive, lock_shared, unlock_ok};
263    use crate::{
264        self as task,
265        io::{McBufErrorExt, McBufReader},
266    };
267    use async_trait::async_trait;
268    use fs4::fs_std::FileExt;
269    use zng_unit::TimeUnits;
270
271    use super::*;
272
273    /// A simple [`CacheDb`] implementation that uses a local directory.
274    ///
275    /// # Implementation Details
276    ///
277    /// A file lock is used to control data access, read operations use a shared lock so concurrent reads can happen,
278    /// the [`set`] operation uses a exclusive lock for the duration of the body download, so subsequent requests for
279    /// a caching resource will await until the cache is completed to return a body that will then read the cached data.
280    ///
281    /// The [`set`] operation returns a body as soon as the entry is created, the body will receive data as it is downloaded and cached,
282    /// in case of a cache error mid-download the cache entry is removed but the returned body will still download the rest of the data.
283    /// In case of an error creating the entry the original body is always returned so the [`Client`] can continue with a normal
284    /// download also.
285    ///
286    /// The cache does not pull data, only data read by the returned body is written to the cache, dropping the body without reading
287    /// to end cancels the cache entry.
288    ///
289    /// [`Client`]: crate::http::Client
290    /// [`set`]: crate::http::CacheDb::set
291    #[derive(Clone)]
292    pub struct FileSystemCache {
293        dir: PathBuf,
294    }
295    impl FileSystemCache {
296        /// Open the cache in `dir` or create it.
297        pub fn new(dir: impl Into<PathBuf>) -> io::Result<Self> {
298            let dir = dir.into();
299            std::fs::create_dir_all(&dir)?;
300
301            Ok(FileSystemCache { dir })
302        }
303
304        async fn entry(&self, key: &CacheKey, write: bool) -> Option<CacheEntry> {
305            let dir = self.dir.clone();
306            let key = key.sha_str();
307            task::wait(move || CacheEntry::open(dir.join(key), write)).await
308        }
309    }
310    #[async_trait]
311    impl CacheDb for FileSystemCache {
312        fn clone_boxed(&self) -> Box<dyn CacheDb> {
313            Box::new(self.clone())
314        }
315
316        async fn policy(&self, key: &CacheKey) -> Option<CachePolicy> {
317            self.entry(key, false).await.map(|mut e| e.policy.take().unwrap())
318        }
319        async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool {
320            if let Some(entry) = self.entry(key, true).await {
321                task::wait(move || entry.write_policy(policy)).await
322            } else {
323                false
324            }
325        }
326
327        async fn body(&self, key: &CacheKey) -> Option<Body> {
328            self.entry(key, false).await?.open_body().await
329        }
330        async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body> {
331            match self.entry(key, true).await {
332                Some(entry) => {
333                    let (entry, ok) = task::wait(move || {
334                        let ok = entry.write_policy(policy);
335                        (entry, ok)
336                    })
337                    .await;
338
339                    if ok { Some(entry.write_body(body).await) } else { Some(body) }
340                }
341                _ => Some(body),
342            }
343        }
344
345        async fn remove(&self, key: &CacheKey) {
346            if let Some(entry) = self.entry(key, true).await {
347                task::wait(move || {
348                    CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
349                })
350                .await
351            }
352        }
353
354        async fn purge(&self) {
355            let dir = self.dir.clone();
356            task::wait(move || {
357                if let Ok(entries) = std::fs::read_dir(dir) {
358                    for entry in entries.flatten() {
359                        let entry = entry.path();
360                        if entry.is_dir() {
361                            if let Ok(lock) = File::open(entry.join(CacheEntry::LOCK)) {
362                                if FileExt::try_lock_shared(&lock).is_ok() {
363                                    CacheEntry::try_delete_locked_dir(&entry, &lock);
364                                }
365                            }
366                        }
367                    }
368                }
369            })
370            .await
371        }
372
373        async fn prune(&self) {
374            let dir = self.dir.clone();
375            task::wait(move || {
376                if let Ok(entries) = std::fs::read_dir(dir) {
377                    let now = SystemTime::now();
378                    let old = (24 * 3).hours();
379
380                    for entry in entries.flatten() {
381                        let entry = entry.path();
382                        if entry.is_dir() {
383                            if let Some(entry) = CacheEntry::open(entry, false) {
384                                let policy = entry.policy.as_ref().unwrap();
385                                if policy.is_stale(now) && policy.age(now) > old {
386                                    CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
387                                }
388                            }
389                        }
390                    }
391                }
392            })
393            .await
394        }
395    }
396
397    struct CacheEntry {
398        dir: PathBuf,
399        lock: File,
400
401        policy: Option<CachePolicy>,
402    }
403    impl CacheEntry {
404        const LOCK: &'static str = ".lock";
405        const WRITING: &'static str = ".w";
406        const POLICY: &'static str = ".policy";
407        const BODY: &'static str = ".body";
408
409        /// Open or create an entry.
410        fn open(dir: PathBuf, write: bool) -> Option<Self> {
411            if write && !dir.exists() {
412                if let Err(e) = fs::create_dir_all(&dir) {
413                    tracing::error!("cache dir error, {e:?}");
414                    return None;
415                }
416            }
417
418            let lock = dir.join(Self::LOCK);
419            let mut opt = OpenOptions::new();
420            if write {
421                opt.read(true).write(true).create(true);
422            } else {
423                opt.read(true);
424            }
425
426            let mut lock = match opt.open(lock) {
427                Ok(l) => l,
428                Err(e) if e.kind() == std::io::ErrorKind::NotFound && !dir.exists() => return None,
429                Err(e) => {
430                    tracing::error!("cache lock open error, {e:?}");
431                    Self::try_delete_dir(&dir);
432                    return None;
433                }
434            };
435
436            const TIMEOUT: Duration = Duration::from_secs(10);
437
438            let lock_r = if write {
439                lock_exclusive(&lock, TIMEOUT)
440            } else {
441                lock_shared(&lock, TIMEOUT)
442            };
443            if let Err(e) = lock_r {
444                tracing::error!("cache lock error, {e:?}");
445                Self::try_delete_dir(&dir);
446                return None;
447            }
448
449            let mut version = String::new();
450            if let Err(e) = lock.read_to_string(&mut version) {
451                tracing::error!("cache lock read error, {e:?}");
452                Self::try_delete_locked_dir(&dir, &lock);
453                return None;
454            }
455
456            let expected_version = "zng::http::FileCache 1.0";
457            if version != expected_version {
458                if write && version.is_empty() {
459                    if let Err(e) = lock.set_len(0).and_then(|()| lock.write_all(expected_version.as_bytes())) {
460                        tracing::error!("cache lock write error, {e:?}");
461                        Self::try_delete_locked_dir(&dir, &lock);
462                        return None;
463                    }
464                } else {
465                    tracing::error!("unknown cache version, {version:?}");
466                    Self::try_delete_locked_dir(&dir, &lock);
467                    return None;
468                }
469            }
470
471            let policy_file = dir.join(Self::POLICY);
472
473            if dir.join(Self::WRITING).exists() {
474                tracing::error!("cache has partial files, removing");
475
476                if write {
477                    if let Err(e) = Self::remove_files(&dir) {
478                        tracing::error!("failed to clear partial files, {e:?}");
479                        Self::try_delete_locked_dir(&dir, &lock);
480                        return None;
481                    }
482                } else {
483                    Self::try_delete_locked_dir(&dir, &lock);
484                    return None;
485                }
486            }
487
488            if policy_file.exists() {
489                let policy = match Self::read_policy(&policy_file) {
490                    Ok(i) => i,
491                    Err(e) => {
492                        tracing::error!("cache policy read error, {e:?}");
493                        Self::try_delete_locked_dir(&dir, &lock);
494                        return None;
495                    }
496                };
497
498                Some(Self {
499                    dir,
500                    lock,
501                    policy: Some(policy),
502                })
503            } else if write {
504                Some(Self { dir, lock, policy: None })
505            } else {
506                tracing::error!("cache policy missing");
507                Self::try_delete_locked_dir(&dir, &lock);
508                None
509            }
510        }
511        fn read_policy(file: &Path) -> Result<CachePolicy, Box<dyn std::error::Error>> {
512            let file = std::fs::File::open(file)?;
513            let file = std::io::BufReader::new(file);
514            let policy = serde_json::from_reader(file)?;
515            Ok(policy)
516        }
517
518        /// Replace the .policy content, returns `true` if the entry still exists.
519        pub fn write_policy(&self, policy: CachePolicy) -> bool {
520            let w_tag = if let Some(t) = self.writing_tag() {
521                t
522            } else {
523                return false;
524            };
525
526            if let Err(e) = self.write_policy_impl(policy) {
527                tracing::error!("cache policy serialize error, {e:?}");
528                Self::try_delete_locked_dir(&self.dir, &self.lock);
529                return false;
530            }
531
532            let _ = fs::remove_file(w_tag);
533
534            true
535        }
536        fn write_policy_impl(&self, policy: CachePolicy) -> Result<(), Box<dyn std::error::Error>> {
537            let file = std::fs::File::create(self.dir.join(Self::POLICY))?;
538            serde_json::to_writer(file, &policy)?;
539            Ok(())
540        }
541
542        /// Start reading the body content, returns `Some(_)` if the entry still exists.
543        pub async fn open_body(&self) -> Option<Body> {
544            match task::fs::File::open(self.dir.join(Self::BODY)).await {
545                Ok(body) => {
546                    if let Ok(metadata) = body.metadata().await {
547                        Some(Body::from_reader_sized(task::io::BufReader::new(body), metadata.len()))
548                    } else {
549                        Some(Body::from_reader(task::io::BufReader::new(body)))
550                    }
551                }
552                Err(e) => {
553                    tracing::error!("cache open body error, {e:?}");
554                    Self::try_delete_locked_dir(&self.dir, &self.lock);
555                    None
556                }
557            }
558        }
559
560        /// Start downloading and writing a copy of the body to the cache entry.
561        pub async fn write_body(self, body: Body) -> Body {
562            let w_tag = if let Some(t) = self.writing_tag() {
563                t
564            } else {
565                return body;
566            };
567
568            match task::fs::File::create(self.dir.join(Self::BODY)).await {
569                Ok(cache_body) => {
570                    let cache_body = task::io::BufWriter::new(cache_body);
571                    let len = body.len();
572                    let mut cache_copy = McBufReader::new(body);
573                    let body_copy = cache_copy.clone();
574                    cache_copy.set_lazy(true); // don't read more than body, gets error if body is dropped before EOF.
575
576                    task::spawn(async move {
577                        if let Err(e) = task::io::copy(cache_copy, cache_body).await {
578                            if e.is_only_lazy_left() {
579                                tracing::warn!("cache cancel");
580                            } else {
581                                tracing::error!("cache body write error, {e:?}");
582                            }
583                            // cleanup partial download, stopped by error of by user dropping body reader.
584                            Self::try_delete_locked_dir(&self.dir, &self.lock);
585                        } else {
586                            let _ = fs::remove_file(w_tag);
587                        }
588                    });
589
590                    if let Some(len) = len {
591                        Body::from_reader_sized(body_copy, len)
592                    } else {
593                        Body::from_reader(body_copy)
594                    }
595                }
596                Err(e) => {
597                    tracing::error!("cache body create error, {e:?}");
598                    Self::try_delete_locked_dir(&self.dir, &self.lock);
599                    body
600                }
601            }
602        }
603
604        fn try_delete_locked_dir(dir: &Path, lock: &File) {
605            let _ = unlock_ok(lock);
606            Self::try_delete_dir(dir);
607        }
608
609        fn try_delete_dir(dir: &Path) {
610            let _ = remove_dir_all::remove_dir_all(dir);
611        }
612
613        fn writing_tag(&self) -> Option<PathBuf> {
614            let tag = self.dir.join(Self::WRITING);
615
616            if let Err(e) = fs::write(&tag, "w") {
617                tracing::error!("cache write tag error, {e:?}");
618                Self::try_delete_locked_dir(&self.dir, &self.lock);
619                None
620            } else {
621                Some(tag)
622            }
623        }
624
625        fn remove_files(dir: &Path) -> std::io::Result<()> {
626            for file in [Self::BODY, Self::POLICY, Self::WRITING] {
627                fs::remove_file(dir.join(file))?
628            }
629            Ok(())
630        }
631    }
632    impl Drop for CacheEntry {
633        fn drop(&mut self) {
634            if let Err(e) = unlock_ok(&self.lock) {
635                tracing::error!("cache unlock error, {e:?}");
636                Self::try_delete_dir(&self.dir);
637            }
638        }
639    }
640}
641
642#[cfg(test)]
643mod tests {
644    use std::{path::PathBuf, time::SystemTime};
645
646    use zng_clone_move::async_clmv;
647
648    use crate::{
649        self as task,
650        http::{header::*, util::*, *},
651    };
652    use zng_unit::*;
653
654    #[test]
655    pub fn file_cache_miss() {
656        test_log();
657        let tmp = TestTempDir::new("file_cache_miss");
658
659        let test = FileSystemCache::new(&tmp).unwrap();
660        let request = Request::get("https://file_cache_miss.invalid/content").unwrap().build();
661        let key = CacheKey::from_request(&request);
662
663        let r = async_test(async move { test.policy(&key).await });
664
665        assert!(r.is_none());
666    }
667
668    #[test]
669    pub fn file_cache_set_no_headers() {
670        test_log();
671        let tmp = TestTempDir::new("file_cache_set_no_headers");
672
673        let test = FileSystemCache::new(&tmp).unwrap();
674        let request = Request::get("https://file_cache_set_no_headers.invalid/content").unwrap().build();
675        let response = Response::new_message(StatusCode::OK, "test content.");
676
677        let key = CacheKey::from_request(&request);
678        let policy = CachePolicy::new(&request.req, &response.0);
679
680        let (headers, body) = async_test(async move {
681            let (parts, body) = response.into_parts();
682
683            let body = test.set(&key, policy, body).await.unwrap();
684
685            let mut response = Response::from_parts(parts, body);
686
687            let body = response.text().await.unwrap();
688
689            (response.into_parts().0.headers, body)
690        });
691
692        assert_eq!(body, "test content.");
693        assert!(headers.is_empty());
694    }
695
696    #[test]
697    pub fn file_cache_set() {
698        test_log();
699        let tmp = TestTempDir::new("file_cache_set");
700
701        let test = FileSystemCache::new(&tmp).unwrap();
702        let request = Request::get("https://file_cache_set.invalid/content").unwrap().build();
703        let key = CacheKey::from_request(&request);
704
705        let mut headers = HeaderMap::default();
706        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
707        let body = Body::from_reader(task::io::Cursor::new("test content."));
708        let response = Response::new(StatusCode::OK, headers, body);
709
710        let policy = CachePolicy::new(&request.req, &response.0);
711
712        let (headers, body) = async_test(async move {
713            let (parts, body) = response.into_parts();
714
715            let body = test.set(&key, policy, body).await.unwrap();
716
717            let mut response = Response::from_parts(parts, body);
718
719            let body = response.text().await.unwrap();
720
721            (response.into_parts().0.headers, body)
722        });
723
724        assert_eq!(
725            headers.get(&header::CONTENT_LENGTH),
726            Some(&HeaderValue::from("test content.".len()))
727        );
728        assert_eq!(body, "test content.");
729    }
730
731    #[test]
732    pub fn file_cache_get_cached() {
733        test_log();
734        let tmp = TestTempDir::new("file_cache_get_cached");
735        let request = Request::get("https://file_cache_get_cached.invalid/content").unwrap().build();
736        let key = CacheKey::from_request(&request);
737
738        let test = FileSystemCache::new(&tmp).unwrap();
739
740        let mut headers = HeaderMap::default();
741        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
742        let body = Body::from_reader(task::io::Cursor::new("test content."));
743        let response = Response::new(StatusCode::OK, headers, body);
744
745        let policy = CachePolicy::new(&request.req, &response.0);
746
747        async_test(async_clmv!(key, {
748            let (_, body) = response.into_parts();
749
750            let mut body = test.set(&key, policy, body).await.unwrap();
751            Body::bytes(&mut body).await.unwrap();
752
753            drop(test);
754        }));
755
756        let test = FileSystemCache::new(&tmp).unwrap();
757
758        let body = async_test(async move {
759            let mut body = test.body(&key).await.unwrap();
760
761            body.text_utf8().await.unwrap()
762        });
763
764        assert_eq!(body, "test content.");
765    }
766
767    #[test]
768    pub fn file_cache_get_policy() {
769        test_log();
770        let tmp = TestTempDir::new("get_etag");
771
772        let test = FileSystemCache::new(&tmp).unwrap();
773
774        let request = Request::get("https://get_etag.invalid/content").unwrap().build();
775        let key = CacheKey::from_request(&request);
776
777        let mut headers = HeaderMap::default();
778        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
779        let response = Response::new(StatusCode::OK, headers, Body::from_reader(task::io::Cursor::new("test content.")));
780        let policy = CachePolicy::new(&request.req, &response.0);
781
782        let r_policy = async_test(async_clmv!(policy, {
783            let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
784            Body::bytes(&mut body).await.unwrap();
785
786            let test = FileSystemCache::new(&tmp).unwrap();
787
788            test.policy(&key).await.unwrap()
789        }));
790
791        let now = SystemTime::now();
792        assert_eq!(policy.age(now), r_policy.age(now));
793    }
794
795    #[test]
796    pub fn file_cache_concurrent_get() {
797        test_log();
798        let tmp = TestTempDir::new("file_cache_concurrent_get");
799        let request = Request::get("https://file_cache_concurrent_get.invalid/content").unwrap().build();
800        let key = CacheKey::from_request(&request);
801
802        let test = FileSystemCache::new(&tmp).unwrap();
803
804        let mut headers = HeaderMap::default();
805        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
806        let body = Body::from_reader(task::io::Cursor::new("test content."));
807        let response = Response::new(StatusCode::OK, headers, body);
808        let policy = CachePolicy::new(&request.req, &response.0);
809
810        async_test(async_clmv!(key, {
811            let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
812            Body::bytes(&mut body).await.unwrap();
813
814            drop(test);
815        }));
816
817        async_test(async move {
818            let a = concurrent_get(tmp.path().to_owned(), key.clone());
819            let b = concurrent_get(tmp.path().to_owned(), key.clone());
820            let c = concurrent_get(tmp.path().to_owned(), key);
821
822            task::all!(a, b, c).await;
823        });
824    }
825    async fn concurrent_get(tmp: PathBuf, body: CacheKey) {
826        task::run(async move {
827            let test = FileSystemCache::new(&tmp).unwrap();
828
829            let mut body = test.body(&body).await.unwrap();
830
831            let body = body.text_utf8().await.unwrap();
832
833            assert_eq!(body, "test content.");
834        })
835        .await
836    }
837
838    #[test]
839    pub fn file_cache_concurrent_set() {
840        test_log();
841        let tmp = TestTempDir::new("file_cache_concurrent_set");
842        let uri = Uri::try_from("https://file_cache_concurrent_set.invalid/content").unwrap();
843
844        async_test(async move {
845            let a = concurrent_set(tmp.path().to_owned(), uri.clone());
846            let b = concurrent_set(tmp.path().to_owned(), uri.clone());
847            let c = concurrent_set(tmp.path().to_owned(), uri);
848
849            task::all!(a, b, c).await;
850        });
851    }
852    async fn concurrent_set(tmp: PathBuf, uri: Uri) {
853        task::run(async move {
854            let test = FileSystemCache::new(&tmp).unwrap();
855
856            let request = Request::get(uri).unwrap().build();
857            let key = CacheKey::from_request(&request);
858
859            let mut headers = HeaderMap::default();
860            headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
861            let body = Body::from_reader(task::io::Cursor::new("test content."));
862            let response = Response::new(StatusCode::OK, headers, body);
863
864            let policy = CachePolicy::new(&request.req, &response.0);
865
866            let (headers, body) = async move {
867                let (parts, body) = response.into_parts();
868
869                let body = test.set(&key, policy, body).await.unwrap();
870                let mut response = Response::from_parts(parts, body);
871
872                let body = response.text().await.unwrap();
873
874                (response.into_parts().0.headers, body)
875            }
876            .await;
877
878            assert_eq!(
879                headers.get(&header::CONTENT_LENGTH),
880                Some(&HeaderValue::from("test content.".len()))
881            );
882            assert_eq!(body, "test content.");
883        })
884        .await
885    }
886
887    #[track_caller]
888    fn async_test<F>(test: F) -> F::Output
889    where
890        F: Future,
891    {
892        task::block_on(task::with_deadline(test, 30.secs())).unwrap()
893    }
894}