1use std::{
2 fmt,
3 time::{Duration, SystemTime},
4};
5
6use super::{Body, Error};
7use async_trait::async_trait;
8use serde::*;
9use zng_unit::*;
10
11use http_cache_semantics as hcs;
12
13pub(super) use hcs::BeforeRequest;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CachePolicy(PolicyInner);
18impl CachePolicy {
19 pub(super) fn new(request: &isahc::Request<super::Body>, response: &isahc::Response<isahc::AsyncBody>) -> Self {
20 let p = hcs::CachePolicy::new_options(
21 request,
22 response,
23 SystemTime::now(),
24 hcs::CacheOptions {
25 shared: false,
26 ignore_cargo_cult: true,
27 ..Default::default()
28 },
29 );
30 Self(PolicyInner::Policy(p))
31 }
32
33 pub(super) fn should_store(&self) -> bool {
34 match &self.0 {
35 PolicyInner::Policy(p) => p.is_storable() && p.time_to_live(SystemTime::now()) > 5.secs(),
36 PolicyInner::Permanent(_) => true,
37 }
38 }
39
40 pub(super) fn new_permanent(response: &isahc::Response<isahc::AsyncBody>) -> Self {
41 let p = PermanentHeader {
42 res: response.headers().clone(),
43 status: response.status(),
44 };
45 Self(PolicyInner::Permanent(p))
46 }
47
48 pub(super) fn is_permanent(&self) -> bool {
49 matches!(self.0, PolicyInner::Permanent(_))
50 }
51
52 pub(super) fn before_request(&self, request: &isahc::Request<super::Body>) -> BeforeRequest {
53 match &self.0 {
54 PolicyInner::Policy(p) => p.before_request(request, SystemTime::now()),
55 PolicyInner::Permanent(p) => BeforeRequest::Fresh(p.parts()),
56 }
57 }
58
59 pub(super) fn after_response(
60 &self,
61 request: &isahc::Request<super::Body>,
62 response: &isahc::Response<isahc::AsyncBody>,
63 ) -> AfterResponse {
64 match &self.0 {
65 PolicyInner::Policy(p) => p.after_response(request, response, SystemTime::now()).into(),
66 PolicyInner::Permanent(_) => unreachable!(), }
68 }
69
70 pub fn age(&self, now: SystemTime) -> Duration {
72 match &self.0 {
73 PolicyInner::Policy(p) => p.age(now),
74 PolicyInner::Permanent(_) => Duration::MAX,
75 }
76 }
77
78 pub fn time_to_live(&self, now: SystemTime) -> Duration {
80 match &self.0 {
81 PolicyInner::Policy(p) => p.time_to_live(now),
82 PolicyInner::Permanent(_) => Duration::MAX,
83 }
84 }
85
86 pub fn is_stale(&self, now: SystemTime) -> bool {
88 match &self.0 {
89 PolicyInner::Policy(p) => p.is_stale(now),
90 PolicyInner::Permanent(_) => false,
91 }
92 }
93}
94impl From<hcs::CachePolicy> for CachePolicy {
95 fn from(p: hcs::CachePolicy) -> Self {
96 CachePolicy(PolicyInner::Policy(p))
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101enum PolicyInner {
102 Policy(hcs::CachePolicy),
103 Permanent(PermanentHeader),
104}
105#[derive(Debug, Clone, Serialize, Deserialize)]
106struct PermanentHeader {
107 #[serde(with = "http_serde::header_map")]
108 res: super::header::HeaderMap,
109 #[serde(with = "http_serde::status_code")]
110 status: super::StatusCode,
111}
112impl PermanentHeader {
113 pub fn parts(&self) -> isahc::http::response::Parts {
114 let mut r = isahc::Response::<()>::default().into_parts().0;
115 r.headers = self.res.clone();
116 r.status = self.status;
117 r
118 }
119}
120
121pub(super) enum AfterResponse {
123 NotModified(CachePolicy, isahc::http::response::Parts),
125 Modified(CachePolicy, isahc::http::response::Parts),
127}
128impl From<hcs::AfterResponse> for AfterResponse {
129 fn from(s: hcs::AfterResponse) -> Self {
130 match s {
131 hcs::AfterResponse::NotModified(po, pa) => AfterResponse::NotModified(po.into(), pa),
132 hcs::AfterResponse::Modified(po, pa) => AfterResponse::Modified(po.into(), pa),
133 }
134 }
135}
136
137#[async_trait]
143pub trait CacheDb: Send + Sync + 'static {
144 fn clone_boxed(&self) -> Box<dyn CacheDb>;
146
147 async fn policy(&self, key: &CacheKey) -> Option<CachePolicy>;
149
150 async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool;
154
155 async fn body(&self, key: &CacheKey) -> Option<Body>;
157
158 async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body>;
166
167 async fn remove(&self, key: &CacheKey);
169
170 async fn purge(&self);
172
173 async fn prune(&self);
177}
178
179#[derive(Debug, Clone, Default)]
187pub enum CacheMode {
188 NoCache,
190
191 #[default]
195 Default,
196
197 Permanent,
201
202 Error(Error),
204}
205
206#[derive(Debug, Clone, PartialEq, Eq, Hash)]
208pub struct CacheKey([u8; 32]);
209impl CacheKey {
210 pub fn from_request(request: &super::Request) -> Self {
212 Self::new(&request.req)
213 }
214
215 pub(super) fn new(request: &isahc::Request<super::Body>) -> Self {
216 let mut headers: Vec<_> = request.headers().iter().map(|(n, v)| (n.clone(), v.clone())).collect();
217
218 headers.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
219
220 use sha2::Digest;
221
222 let mut m = sha2::Sha512_256::new();
223 m.update(request.uri().to_string().as_bytes());
224 m.update(request.method().as_str());
225 for (name, value) in headers {
226 m.update(name.as_str().as_bytes());
227 m.update(value.as_bytes());
228 }
229 let hash = m.finalize();
230
231 CacheKey(hash.into())
232 }
233
234 pub fn sha(&self) -> [u8; 32] {
236 self.0
237 }
238
239 pub fn sha_str(&self) -> String {
241 use base64::*;
242
243 let hash = self.sha();
244 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&hash[..])
245 }
246}
247impl fmt::Display for CacheKey {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 write!(f, "{}", self.sha_str())
250 }
251}
252
253pub use file_cache::FileSystemCache;
254
255mod file_cache {
256 use std::{
257 fs::{self, File, OpenOptions},
258 io::{self, Read, Write},
259 path::{Path, PathBuf},
260 };
261
262 use crate::http::util::{lock_exclusive, lock_shared, unlock_ok};
263 use crate::{
264 self as task,
265 io::{McBufErrorExt, McBufReader},
266 };
267 use async_trait::async_trait;
268 use fs4::fs_std::FileExt;
269 use zng_unit::TimeUnits;
270
271 use super::*;
272
273 #[derive(Clone)]
292 pub struct FileSystemCache {
293 dir: PathBuf,
294 }
295 impl FileSystemCache {
296 pub fn new(dir: impl Into<PathBuf>) -> io::Result<Self> {
298 let dir = dir.into();
299 std::fs::create_dir_all(&dir)?;
300
301 Ok(FileSystemCache { dir })
302 }
303
304 async fn entry(&self, key: &CacheKey, write: bool) -> Option<CacheEntry> {
305 let dir = self.dir.clone();
306 let key = key.sha_str();
307 task::wait(move || CacheEntry::open(dir.join(key), write)).await
308 }
309 }
310 #[async_trait]
311 impl CacheDb for FileSystemCache {
312 fn clone_boxed(&self) -> Box<dyn CacheDb> {
313 Box::new(self.clone())
314 }
315
316 async fn policy(&self, key: &CacheKey) -> Option<CachePolicy> {
317 self.entry(key, false).await.map(|mut e| e.policy.take().unwrap())
318 }
319 async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool {
320 if let Some(entry) = self.entry(key, true).await {
321 task::wait(move || entry.write_policy(policy)).await
322 } else {
323 false
324 }
325 }
326
327 async fn body(&self, key: &CacheKey) -> Option<Body> {
328 self.entry(key, false).await?.open_body().await
329 }
330 async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body> {
331 match self.entry(key, true).await {
332 Some(entry) => {
333 let (entry, ok) = task::wait(move || {
334 let ok = entry.write_policy(policy);
335 (entry, ok)
336 })
337 .await;
338
339 if ok { Some(entry.write_body(body).await) } else { Some(body) }
340 }
341 _ => Some(body),
342 }
343 }
344
345 async fn remove(&self, key: &CacheKey) {
346 if let Some(entry) = self.entry(key, true).await {
347 task::wait(move || {
348 CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
349 })
350 .await
351 }
352 }
353
354 async fn purge(&self) {
355 let dir = self.dir.clone();
356 task::wait(move || {
357 if let Ok(entries) = std::fs::read_dir(dir) {
358 for entry in entries.flatten() {
359 let entry = entry.path();
360 if entry.is_dir() {
361 if let Ok(lock) = File::open(entry.join(CacheEntry::LOCK)) {
362 if FileExt::try_lock_shared(&lock).is_ok() {
363 CacheEntry::try_delete_locked_dir(&entry, &lock);
364 }
365 }
366 }
367 }
368 }
369 })
370 .await
371 }
372
373 async fn prune(&self) {
374 let dir = self.dir.clone();
375 task::wait(move || {
376 if let Ok(entries) = std::fs::read_dir(dir) {
377 let now = SystemTime::now();
378 let old = (24 * 3).hours();
379
380 for entry in entries.flatten() {
381 let entry = entry.path();
382 if entry.is_dir() {
383 if let Some(entry) = CacheEntry::open(entry, false) {
384 let policy = entry.policy.as_ref().unwrap();
385 if policy.is_stale(now) && policy.age(now) > old {
386 CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
387 }
388 }
389 }
390 }
391 }
392 })
393 .await
394 }
395 }
396
397 struct CacheEntry {
398 dir: PathBuf,
399 lock: File,
400
401 policy: Option<CachePolicy>,
402 }
403 impl CacheEntry {
404 const LOCK: &'static str = ".lock";
405 const WRITING: &'static str = ".w";
406 const POLICY: &'static str = ".policy";
407 const BODY: &'static str = ".body";
408
409 fn open(dir: PathBuf, write: bool) -> Option<Self> {
411 if write && !dir.exists() {
412 if let Err(e) = fs::create_dir_all(&dir) {
413 tracing::error!("cache dir error, {e:?}");
414 return None;
415 }
416 }
417
418 let lock = dir.join(Self::LOCK);
419 let mut opt = OpenOptions::new();
420 if write {
421 opt.read(true).write(true).create(true);
422 } else {
423 opt.read(true);
424 }
425
426 let mut lock = match opt.open(lock) {
427 Ok(l) => l,
428 Err(e) if e.kind() == std::io::ErrorKind::NotFound && !dir.exists() => return None,
429 Err(e) => {
430 tracing::error!("cache lock open error, {e:?}");
431 Self::try_delete_dir(&dir);
432 return None;
433 }
434 };
435
436 const TIMEOUT: Duration = Duration::from_secs(10);
437
438 let lock_r = if write {
439 lock_exclusive(&lock, TIMEOUT)
440 } else {
441 lock_shared(&lock, TIMEOUT)
442 };
443 if let Err(e) = lock_r {
444 tracing::error!("cache lock error, {e:?}");
445 Self::try_delete_dir(&dir);
446 return None;
447 }
448
449 let mut version = String::new();
450 if let Err(e) = lock.read_to_string(&mut version) {
451 tracing::error!("cache lock read error, {e:?}");
452 Self::try_delete_locked_dir(&dir, &lock);
453 return None;
454 }
455
456 let expected_version = "zng::http::FileCache 1.0";
457 if version != expected_version {
458 if write && version.is_empty() {
459 if let Err(e) = lock.set_len(0).and_then(|()| lock.write_all(expected_version.as_bytes())) {
460 tracing::error!("cache lock write error, {e:?}");
461 Self::try_delete_locked_dir(&dir, &lock);
462 return None;
463 }
464 } else {
465 tracing::error!("unknown cache version, {version:?}");
466 Self::try_delete_locked_dir(&dir, &lock);
467 return None;
468 }
469 }
470
471 let policy_file = dir.join(Self::POLICY);
472
473 if dir.join(Self::WRITING).exists() {
474 tracing::error!("cache has partial files, removing");
475
476 if write {
477 if let Err(e) = Self::remove_files(&dir) {
478 tracing::error!("failed to clear partial files, {e:?}");
479 Self::try_delete_locked_dir(&dir, &lock);
480 return None;
481 }
482 } else {
483 Self::try_delete_locked_dir(&dir, &lock);
484 return None;
485 }
486 }
487
488 if policy_file.exists() {
489 let policy = match Self::read_policy(&policy_file) {
490 Ok(i) => i,
491 Err(e) => {
492 tracing::error!("cache policy read error, {e:?}");
493 Self::try_delete_locked_dir(&dir, &lock);
494 return None;
495 }
496 };
497
498 Some(Self {
499 dir,
500 lock,
501 policy: Some(policy),
502 })
503 } else if write {
504 Some(Self { dir, lock, policy: None })
505 } else {
506 tracing::error!("cache policy missing");
507 Self::try_delete_locked_dir(&dir, &lock);
508 None
509 }
510 }
511 fn read_policy(file: &Path) -> Result<CachePolicy, Box<dyn std::error::Error>> {
512 let file = std::fs::File::open(file)?;
513 let file = std::io::BufReader::new(file);
514 let policy = serde_json::from_reader(file)?;
515 Ok(policy)
516 }
517
518 pub fn write_policy(&self, policy: CachePolicy) -> bool {
520 let w_tag = if let Some(t) = self.writing_tag() {
521 t
522 } else {
523 return false;
524 };
525
526 if let Err(e) = self.write_policy_impl(policy) {
527 tracing::error!("cache policy serialize error, {e:?}");
528 Self::try_delete_locked_dir(&self.dir, &self.lock);
529 return false;
530 }
531
532 let _ = fs::remove_file(w_tag);
533
534 true
535 }
536 fn write_policy_impl(&self, policy: CachePolicy) -> Result<(), Box<dyn std::error::Error>> {
537 let file = std::fs::File::create(self.dir.join(Self::POLICY))?;
538 serde_json::to_writer(file, &policy)?;
539 Ok(())
540 }
541
542 pub async fn open_body(&self) -> Option<Body> {
544 match task::fs::File::open(self.dir.join(Self::BODY)).await {
545 Ok(body) => {
546 if let Ok(metadata) = body.metadata().await {
547 Some(Body::from_reader_sized(task::io::BufReader::new(body), metadata.len()))
548 } else {
549 Some(Body::from_reader(task::io::BufReader::new(body)))
550 }
551 }
552 Err(e) => {
553 tracing::error!("cache open body error, {e:?}");
554 Self::try_delete_locked_dir(&self.dir, &self.lock);
555 None
556 }
557 }
558 }
559
560 pub async fn write_body(self, body: Body) -> Body {
562 let w_tag = if let Some(t) = self.writing_tag() {
563 t
564 } else {
565 return body;
566 };
567
568 match task::fs::File::create(self.dir.join(Self::BODY)).await {
569 Ok(cache_body) => {
570 let cache_body = task::io::BufWriter::new(cache_body);
571 let len = body.len();
572 let mut cache_copy = McBufReader::new(body);
573 let body_copy = cache_copy.clone();
574 cache_copy.set_lazy(true); task::spawn(async move {
577 if let Err(e) = task::io::copy(cache_copy, cache_body).await {
578 if e.is_only_lazy_left() {
579 tracing::warn!("cache cancel");
580 } else {
581 tracing::error!("cache body write error, {e:?}");
582 }
583 Self::try_delete_locked_dir(&self.dir, &self.lock);
585 } else {
586 let _ = fs::remove_file(w_tag);
587 }
588 });
589
590 if let Some(len) = len {
591 Body::from_reader_sized(body_copy, len)
592 } else {
593 Body::from_reader(body_copy)
594 }
595 }
596 Err(e) => {
597 tracing::error!("cache body create error, {e:?}");
598 Self::try_delete_locked_dir(&self.dir, &self.lock);
599 body
600 }
601 }
602 }
603
604 fn try_delete_locked_dir(dir: &Path, lock: &File) {
605 let _ = unlock_ok(lock);
606 Self::try_delete_dir(dir);
607 }
608
609 fn try_delete_dir(dir: &Path) {
610 let _ = remove_dir_all::remove_dir_all(dir);
611 }
612
613 fn writing_tag(&self) -> Option<PathBuf> {
614 let tag = self.dir.join(Self::WRITING);
615
616 if let Err(e) = fs::write(&tag, "w") {
617 tracing::error!("cache write tag error, {e:?}");
618 Self::try_delete_locked_dir(&self.dir, &self.lock);
619 None
620 } else {
621 Some(tag)
622 }
623 }
624
625 fn remove_files(dir: &Path) -> std::io::Result<()> {
626 for file in [Self::BODY, Self::POLICY, Self::WRITING] {
627 fs::remove_file(dir.join(file))?
628 }
629 Ok(())
630 }
631 }
632 impl Drop for CacheEntry {
633 fn drop(&mut self) {
634 if let Err(e) = unlock_ok(&self.lock) {
635 tracing::error!("cache unlock error, {e:?}");
636 Self::try_delete_dir(&self.dir);
637 }
638 }
639 }
640}
641
642#[cfg(test)]
643mod tests {
644 use std::{path::PathBuf, time::SystemTime};
645
646 use zng_clone_move::async_clmv;
647
648 use crate::{
649 self as task,
650 http::{header::*, util::*, *},
651 };
652 use zng_unit::*;
653
654 #[test]
655 pub fn file_cache_miss() {
656 test_log();
657 let tmp = TestTempDir::new("file_cache_miss");
658
659 let test = FileSystemCache::new(&tmp).unwrap();
660 let request = Request::get("https://file_cache_miss.invalid/content").unwrap().build();
661 let key = CacheKey::from_request(&request);
662
663 let r = async_test(async move { test.policy(&key).await });
664
665 assert!(r.is_none());
666 }
667
668 #[test]
669 pub fn file_cache_set_no_headers() {
670 test_log();
671 let tmp = TestTempDir::new("file_cache_set_no_headers");
672
673 let test = FileSystemCache::new(&tmp).unwrap();
674 let request = Request::get("https://file_cache_set_no_headers.invalid/content").unwrap().build();
675 let response = Response::new_message(StatusCode::OK, "test content.");
676
677 let key = CacheKey::from_request(&request);
678 let policy = CachePolicy::new(&request.req, &response.0);
679
680 let (headers, body) = async_test(async move {
681 let (parts, body) = response.into_parts();
682
683 let body = test.set(&key, policy, body).await.unwrap();
684
685 let mut response = Response::from_parts(parts, body);
686
687 let body = response.text().await.unwrap();
688
689 (response.into_parts().0.headers, body)
690 });
691
692 assert_eq!(body, "test content.");
693 assert!(headers.is_empty());
694 }
695
696 #[test]
697 pub fn file_cache_set() {
698 test_log();
699 let tmp = TestTempDir::new("file_cache_set");
700
701 let test = FileSystemCache::new(&tmp).unwrap();
702 let request = Request::get("https://file_cache_set.invalid/content").unwrap().build();
703 let key = CacheKey::from_request(&request);
704
705 let mut headers = HeaderMap::default();
706 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
707 let body = Body::from_reader(task::io::Cursor::new("test content."));
708 let response = Response::new(StatusCode::OK, headers, body);
709
710 let policy = CachePolicy::new(&request.req, &response.0);
711
712 let (headers, body) = async_test(async move {
713 let (parts, body) = response.into_parts();
714
715 let body = test.set(&key, policy, body).await.unwrap();
716
717 let mut response = Response::from_parts(parts, body);
718
719 let body = response.text().await.unwrap();
720
721 (response.into_parts().0.headers, body)
722 });
723
724 assert_eq!(
725 headers.get(&header::CONTENT_LENGTH),
726 Some(&HeaderValue::from("test content.".len()))
727 );
728 assert_eq!(body, "test content.");
729 }
730
731 #[test]
732 pub fn file_cache_get_cached() {
733 test_log();
734 let tmp = TestTempDir::new("file_cache_get_cached");
735 let request = Request::get("https://file_cache_get_cached.invalid/content").unwrap().build();
736 let key = CacheKey::from_request(&request);
737
738 let test = FileSystemCache::new(&tmp).unwrap();
739
740 let mut headers = HeaderMap::default();
741 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
742 let body = Body::from_reader(task::io::Cursor::new("test content."));
743 let response = Response::new(StatusCode::OK, headers, body);
744
745 let policy = CachePolicy::new(&request.req, &response.0);
746
747 async_test(async_clmv!(key, {
748 let (_, body) = response.into_parts();
749
750 let mut body = test.set(&key, policy, body).await.unwrap();
751 Body::bytes(&mut body).await.unwrap();
752
753 drop(test);
754 }));
755
756 let test = FileSystemCache::new(&tmp).unwrap();
757
758 let body = async_test(async move {
759 let mut body = test.body(&key).await.unwrap();
760
761 body.text_utf8().await.unwrap()
762 });
763
764 assert_eq!(body, "test content.");
765 }
766
767 #[test]
768 pub fn file_cache_get_policy() {
769 test_log();
770 let tmp = TestTempDir::new("get_etag");
771
772 let test = FileSystemCache::new(&tmp).unwrap();
773
774 let request = Request::get("https://get_etag.invalid/content").unwrap().build();
775 let key = CacheKey::from_request(&request);
776
777 let mut headers = HeaderMap::default();
778 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
779 let response = Response::new(StatusCode::OK, headers, Body::from_reader(task::io::Cursor::new("test content.")));
780 let policy = CachePolicy::new(&request.req, &response.0);
781
782 let r_policy = async_test(async_clmv!(policy, {
783 let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
784 Body::bytes(&mut body).await.unwrap();
785
786 let test = FileSystemCache::new(&tmp).unwrap();
787
788 test.policy(&key).await.unwrap()
789 }));
790
791 let now = SystemTime::now();
792 assert_eq!(policy.age(now), r_policy.age(now));
793 }
794
795 #[test]
796 pub fn file_cache_concurrent_get() {
797 test_log();
798 let tmp = TestTempDir::new("file_cache_concurrent_get");
799 let request = Request::get("https://file_cache_concurrent_get.invalid/content").unwrap().build();
800 let key = CacheKey::from_request(&request);
801
802 let test = FileSystemCache::new(&tmp).unwrap();
803
804 let mut headers = HeaderMap::default();
805 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
806 let body = Body::from_reader(task::io::Cursor::new("test content."));
807 let response = Response::new(StatusCode::OK, headers, body);
808 let policy = CachePolicy::new(&request.req, &response.0);
809
810 async_test(async_clmv!(key, {
811 let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
812 Body::bytes(&mut body).await.unwrap();
813
814 drop(test);
815 }));
816
817 async_test(async move {
818 let a = concurrent_get(tmp.path().to_owned(), key.clone());
819 let b = concurrent_get(tmp.path().to_owned(), key.clone());
820 let c = concurrent_get(tmp.path().to_owned(), key);
821
822 task::all!(a, b, c).await;
823 });
824 }
825 async fn concurrent_get(tmp: PathBuf, body: CacheKey) {
826 task::run(async move {
827 let test = FileSystemCache::new(&tmp).unwrap();
828
829 let mut body = test.body(&body).await.unwrap();
830
831 let body = body.text_utf8().await.unwrap();
832
833 assert_eq!(body, "test content.");
834 })
835 .await
836 }
837
838 #[test]
839 pub fn file_cache_concurrent_set() {
840 test_log();
841 let tmp = TestTempDir::new("file_cache_concurrent_set");
842 let uri = Uri::try_from("https://file_cache_concurrent_set.invalid/content").unwrap();
843
844 async_test(async move {
845 let a = concurrent_set(tmp.path().to_owned(), uri.clone());
846 let b = concurrent_set(tmp.path().to_owned(), uri.clone());
847 let c = concurrent_set(tmp.path().to_owned(), uri);
848
849 task::all!(a, b, c).await;
850 });
851 }
852 async fn concurrent_set(tmp: PathBuf, uri: Uri) {
853 task::run(async move {
854 let test = FileSystemCache::new(&tmp).unwrap();
855
856 let request = Request::get(uri).unwrap().build();
857 let key = CacheKey::from_request(&request);
858
859 let mut headers = HeaderMap::default();
860 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
861 let body = Body::from_reader(task::io::Cursor::new("test content."));
862 let response = Response::new(StatusCode::OK, headers, body);
863
864 let policy = CachePolicy::new(&request.req, &response.0);
865
866 let (headers, body) = async move {
867 let (parts, body) = response.into_parts();
868
869 let body = test.set(&key, policy, body).await.unwrap();
870 let mut response = Response::from_parts(parts, body);
871
872 let body = response.text().await.unwrap();
873
874 (response.into_parts().0.headers, body)
875 }
876 .await;
877
878 assert_eq!(
879 headers.get(&header::CONTENT_LENGTH),
880 Some(&HeaderValue::from("test content.".len()))
881 );
882 assert_eq!(body, "test content.");
883 })
884 .await
885 }
886
887 #[track_caller]
888 fn async_test<F>(test: F) -> F::Output
889 where
890 F: Future,
891 {
892 task::block_on(task::with_deadline(test, 30.secs())).unwrap()
893 }
894}