1use std::{
2 fs::{self, File, OpenOptions},
3 io::{self, Read, Write},
4 path::{Path, PathBuf},
5 pin::Pin,
6 time::SystemTime,
7};
8
9use crate::{
10 self as task,
11 http::util::{lock_exclusive, lock_shared, unlock_ok},
12};
13use zng_unit::TimeUnits;
14
15use super::*;
16
17type Fut<O> = Pin<Box<dyn Future<Output = O> + Send>>;
18
19#[derive(Clone)]
21pub struct FileSystemCache {
22 dir: PathBuf,
23}
24impl FileSystemCache {
25 pub fn new(dir: PathBuf) -> io::Result<Self> {
27 std::fs::create_dir_all(&dir)?;
28
29 Ok(FileSystemCache { dir })
30 }
31
32 async fn entry(&self, key: CacheKey, write: bool) -> Option<CacheEntry> {
33 let dir = self.dir.clone();
34 let key = key.sha_str();
35 task::wait(move || CacheEntry::open(dir.join(key), write)).await
36 }
37
38 #[cfg(feature = "http_cookie")]
39 async fn cookie_jar(&self) -> cookie_store::CookieStore {
40 let s = match crate::fs::read_to_string(self.dir.join(Self::COOKIE_JAR)).await {
41 Ok(s) => s,
42 Err(e) => {
43 if !matches!(e.kind(), io::ErrorKind::NotFound) {
44 tracing::error!("cannot read cookies store, {e}")
45 }
46 return cookie_store::CookieStore::default();
47 }
48 };
49 match serde_json::from_str(&s) {
50 Ok(j) => j,
51 Err(e) => {
52 tracing::error!("invalid cookie store format, {e}");
53 cookie_store::CookieStore::default()
54 }
55 }
56 }
57
58 #[cfg(feature = "http_cookie")]
59 async fn set_cookie_jar(&self, jar: cookie_store::CookieStore) {
60 let s = serde_json::to_string(&jar).unwrap();
61 if let Err(e) = crate::fs::write(self.dir.join(Self::COOKIE_JAR), s.as_bytes()).await {
62 tracing::error!("cannot write cookie store, {e}")
63 }
64 }
65
66 #[cfg(feature = "http_cookie")]
67 const COOKIE_JAR: &str = "cookie_jar.json";
68}
69impl HttpCache for FileSystemCache {
70 fn policy(&'static self, key: CacheKey) -> Fut<Option<CachePolicy>> {
71 Box::pin(async { self.entry(key, false).await.map(|mut e| e.policy.take().unwrap()) })
72 }
73 fn set_policy(&'static self, key: CacheKey, policy: CachePolicy) -> Fut<bool> {
74 Box::pin(async {
75 if let Some(entry) = self.entry(key, true).await {
76 task::wait(move || entry.write_policy(policy)).await
77 } else {
78 false
79 }
80 })
81 }
82
83 fn body(&'static self, key: CacheKey) -> Fut<Option<IpcBytes>> {
84 Box::pin(async { self.entry(key, false).await?.open_body().await })
85 }
86 fn set(&'static self, key: CacheKey, policy: CachePolicy, body: IpcBytes) -> Fut<()> {
87 Box::pin(async {
88 if let Some(entry) = self.entry(key, true).await {
89 let (entry, ok) = task::wait(move || {
90 let ok = entry.write_policy(policy);
91 (entry, ok)
92 })
93 .await;
94 if ok {
95 entry.write_body(body).await;
96 }
97 }
98 })
99 }
100
101 fn remove(&'static self, key: CacheKey) -> Fut<()> {
102 Box::pin(async {
103 if let Some(entry) = self.entry(key, true).await {
104 task::wait(move || {
105 CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
106 })
107 .await
108 }
109 })
110 }
111
112 #[cfg(feature = "http_cookie")]
113 fn cookie(&'static self, uri: Uri) -> Fut<Option<http::HeaderValue>> {
114 Box::pin(async move {
115 let jar = self.cookie_jar().await;
116 let mut r = String::new();
117 let mut sep = "";
118 for (key, value) in jar.get_request_values(&uri.to_string().parse().unwrap()) {
119 r.push_str(sep);
120 r.push_str(key);
121 r.push('=');
122 r.push_str(value);
123 sep = "; ";
124 }
125 if r.is_empty() {
126 None
127 } else {
128 match http::HeaderValue::from_str(&r) {
129 Ok(c) => Some(c),
130 Err(e) => {
131 tracing::error!("invalid cookie storage, {e}");
132 self.set_cookie_jar(jar).await;
133 None
134 }
135 }
136 }
137 })
138 }
139 #[cfg(feature = "http_cookie")]
140 fn set_cookie(&'static self, uri: Uri, cookie: http::HeaderValue) -> Fut<()> {
141 Box::pin(async move {
142 let cookie = match cookie.to_str() {
143 Ok(s) => s.to_owned(),
144 Err(e) => {
145 tracing::error!("cannot store invalid cookie value, {e}");
146 return;
147 }
148 };
149 let url = uri.to_string().parse().unwrap();
150 let cookies = cookie
151 .split(';')
152 .flat_map(|c| cookie_store::Cookie::parse(c.trim(), &url).ok())
153 .map(|c| (*c).clone().into_owned());
154 let mut jar = self.cookie_jar().await;
155 jar.store_response_cookies(cookies, &url);
156 self.set_cookie_jar(jar).await;
157 })
158 }
159 #[cfg(feature = "http_cookie")]
160 fn remove_cookie(&'static self, uri: Uri) -> Fut<()> {
161 Box::pin(async move {
162 let mut jar = self.cookie_jar().await;
163 let matches = jar.matches(&uri.to_string().parse().unwrap());
164 if matches.is_empty() {
165 return;
166 }
167 let matches: Vec<_> = matches.into_iter().cloned().collect();
168 for c in matches {
169 jar.remove(c.domain().unwrap_or(""), c.path().unwrap_or(""), c.name());
170 }
171 self.set_cookie_jar(jar).await;
172 })
173 }
174
175 fn purge(&'static self) -> Fut<()> {
176 Box::pin(async {
177 let dir = self.dir.clone();
178 task::wait(move || {
179 if let Ok(entries) = std::fs::read_dir(dir) {
180 for entry in entries.flatten() {
181 let entry = entry.path();
182 if entry.is_dir()
183 && let Ok(lock) = File::open(entry.join(CacheEntry::LOCK))
184 && lock.try_lock_shared().is_ok()
185 {
186 CacheEntry::try_delete_locked_dir(&entry, &lock);
187 }
188 }
189 }
190 })
191 .await
192 })
193 }
194
195 fn prune(&'static self) -> Fut<()> {
196 Box::pin(async {
197 let dir = self.dir.clone();
198 task::wait(move || {
199 if let Ok(entries) = std::fs::read_dir(dir) {
200 let now = SystemTime::now();
201 let old = (24 * 3).hours();
202
203 for entry in entries.flatten() {
204 let entry = entry.path();
205 if entry.is_dir()
206 && let Some(entry) = CacheEntry::open(entry, false)
207 {
208 let policy = entry.policy.as_ref().unwrap();
209 if policy.is_stale(now) && policy.age(now) > old {
210 CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
211 }
212 }
213 }
214 }
215 })
216 .await
217 })
218 }
219}
220
221struct CacheEntry {
222 dir: PathBuf,
223 lock: File,
224
225 policy: Option<CachePolicy>,
226}
227impl CacheEntry {
228 const LOCK: &'static str = ".lock";
229 const WRITING: &'static str = ".w";
230 const POLICY: &'static str = ".policy";
231 const BODY: &'static str = ".body";
232
233 fn open(dir: PathBuf, write: bool) -> Option<Self> {
235 if write
236 && !dir.exists()
237 && let Err(e) = fs::create_dir_all(&dir)
238 {
239 tracing::error!("cache dir error, {e:?}");
240 return None;
241 }
242
243 let lock = dir.join(Self::LOCK);
244 let mut opt = OpenOptions::new();
245 if write {
246 opt.read(true).write(true).create(true);
247 } else {
248 opt.read(true);
249 }
250
251 let mut lock = match opt.open(lock) {
252 Ok(l) => l,
253 Err(e) if e.kind() == std::io::ErrorKind::NotFound && !dir.exists() => return None,
254 Err(e) => {
255 tracing::error!("cache lock open error, {e:?}");
256 Self::try_delete_dir(&dir);
257 return None;
258 }
259 };
260
261 const TIMEOUT: Duration = Duration::from_secs(10);
262
263 let lock_r = if write {
264 lock_exclusive(&lock, TIMEOUT)
265 } else {
266 lock_shared(&lock, TIMEOUT)
267 };
268 if let Err(e) = lock_r {
269 tracing::error!("cache lock error, {e:?}");
270 Self::try_delete_dir(&dir);
271 return None;
272 }
273
274 let mut version = String::new();
275 if let Err(e) = lock.read_to_string(&mut version) {
276 tracing::error!("cache lock read error, {e:?}");
277 Self::try_delete_locked_dir(&dir, &lock);
278 return None;
279 }
280
281 let expected_version = "zng::http::FileCache 1.0";
282 if version != expected_version {
283 if write && version.is_empty() {
284 if let Err(e) = lock.set_len(0).and_then(|()| lock.write_all(expected_version.as_bytes())) {
285 tracing::error!("cache lock write error, {e:?}");
286 Self::try_delete_locked_dir(&dir, &lock);
287 return None;
288 }
289 } else {
290 tracing::error!("unknown cache version, {version:?}");
291 Self::try_delete_locked_dir(&dir, &lock);
292 return None;
293 }
294 }
295
296 let policy_file = dir.join(Self::POLICY);
297
298 if dir.join(Self::WRITING).exists() {
299 tracing::error!("cache has partial files, removing");
300
301 if write {
302 if let Err(e) = Self::remove_files(&dir) {
303 tracing::error!("failed to clear partial files, {e:?}");
304 Self::try_delete_locked_dir(&dir, &lock);
305 return None;
306 }
307 } else {
308 Self::try_delete_locked_dir(&dir, &lock);
309 return None;
310 }
311 }
312
313 if policy_file.exists() {
314 let policy = match Self::read_policy(&policy_file) {
315 Ok(i) => i,
316 Err(e) => {
317 tracing::error!("cache policy read error, {e:?}");
318 Self::try_delete_locked_dir(&dir, &lock);
319 return None;
320 }
321 };
322
323 Some(Self {
324 dir,
325 lock,
326 policy: Some(policy),
327 })
328 } else if write {
329 Some(Self { dir, lock, policy: None })
330 } else {
331 tracing::error!("cache policy missing");
332 Self::try_delete_locked_dir(&dir, &lock);
333 None
334 }
335 }
336 fn read_policy(file: &Path) -> Result<CachePolicy, Box<dyn std::error::Error>> {
337 let file = std::fs::File::open(file)?;
338 let file = std::io::BufReader::new(file);
339 let policy = serde_json::from_reader(file)?;
340 Ok(policy)
341 }
342
343 pub fn write_policy(&self, policy: CachePolicy) -> bool {
345 let w_tag = if let Some(t) = self.writing_tag() {
346 t
347 } else {
348 return false;
349 };
350
351 if let Err(e) = self.write_policy_impl(policy) {
352 tracing::error!("cache policy serialize error, {e:?}");
353 Self::try_delete_locked_dir(&self.dir, &self.lock);
354 return false;
355 }
356
357 let _ = fs::remove_file(w_tag);
358
359 true
360 }
361 fn write_policy_impl(&self, policy: CachePolicy) -> Result<(), Box<dyn std::error::Error>> {
362 let file = std::fs::File::create(self.dir.join(Self::POLICY))?;
363 serde_json::to_writer(file, &policy)?;
364 Ok(())
365 }
366
367 pub async fn open_body(&self) -> Option<IpcBytes> {
369 let path = self.dir.join(Self::BODY);
370 match task::wait(move || IpcBytes::from_file_blocking(&path)).await {
371 Ok(b) => Some(b),
372 Err(e) => {
373 tracing::error!("cache open body error, {e:?}");
374 Self::try_delete_locked_dir(&self.dir, &self.lock);
375 None
376 }
377 }
378 }
379
380 pub async fn write_body(self, body: IpcBytes) {
382 let w_tag = if let Some(t) = self.writing_tag() {
383 t
384 } else {
385 return;
386 };
387
388 if let Err(e) = task::fs::write(self.dir.join(Self::BODY), body).await {
389 tracing::error!("cache body create error, {e:?}");
390 Self::try_delete_locked_dir(&self.dir, &self.lock);
391 } else {
392 let _ = fs::remove_file(w_tag);
393 }
394 }
395
396 fn try_delete_locked_dir(dir: &Path, lock: &File) {
397 let _ = unlock_ok(lock);
398 Self::try_delete_dir(dir);
399 }
400
401 fn try_delete_dir(dir: &Path) {
402 let _ = remove_dir_all::remove_dir_all(dir);
403 }
404
405 fn writing_tag(&self) -> Option<PathBuf> {
406 let tag = self.dir.join(Self::WRITING);
407
408 if let Err(e) = fs::write(&tag, "w") {
409 tracing::error!("cache write tag error, {e:?}");
410 Self::try_delete_locked_dir(&self.dir, &self.lock);
411 None
412 } else {
413 Some(tag)
414 }
415 }
416
417 fn remove_files(dir: &Path) -> std::io::Result<()> {
418 for file in [Self::BODY, Self::POLICY, Self::WRITING] {
419 fs::remove_file(dir.join(file))?
420 }
421 Ok(())
422 }
423}
424impl Drop for CacheEntry {
425 fn drop(&mut self) {
426 if let Err(e) = unlock_ok(&self.lock) {
427 tracing::error!("cache unlock error, {e:?}");
428 Self::try_delete_dir(&self.dir);
429 }
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use std::time::SystemTime;
436
437 use zng_clone_move::async_clmv;
438
439 use crate::{
440 self as task,
441 http::{file_cache::FileSystemCache, header::*, util::*, *},
442 };
443 use zng_unit::*;
444
445 macro_rules! test_cache {
446 ($test:tt, $tmp:tt, $tmp_file:tt) => {
447 test_log();
448 let $tmp = TestTempDir::new($tmp_file);
449 let $test: &'static FileSystemCache = Box::leak(Box::new(FileSystemCache::new($tmp.path().to_owned()).unwrap()));
450 };
451 ($test:tt, $tmp:tt) => {
452 test_cache!($test, tmp, $tmp)
453 };
454 }
455
456 #[test]
457 pub fn file_cache_miss() {
458 test_cache!(test, "file_cache_miss");
459
460 let request = Request::get("https://file_cache_miss.invalid/content").unwrap();
461 let key = CacheKey::from_request(&request);
462
463 let r = async_test(async move { test.policy(key).await });
464
465 assert!(r.is_none());
466 }
467
468 #[test]
469 pub fn file_cache_set_get() {
470 test_cache!(test, "file_cache_set");
471
472 let request = Request::get("https://file_cache_set.invalid/content").unwrap();
473 let key = CacheKey::from_request(&request);
474
475 let mut response = Response::from_msg(StatusCode::OK, "test content.");
476
477 let policy = CachePolicy::new(&request, &response);
478
479 let body = async_test(async move {
480 test.set(key.clone(), policy, response.body().await.unwrap()).await;
481 let body = test.body(key).await.unwrap();
482 Response::from_done(StatusCode::OK, HeaderMap::new(), Uri::from_static("/"), Metrics::zero(), body)
483 .body_text()
484 .await
485 .unwrap()
486 });
487
488 assert_eq!(body, "test content.");
489 }
490
491 #[test]
492 pub fn file_cache_get_cached() {
493 test_cache!(test, tmp, "file_cache_get_cached");
494
495 let request = Request::get("https://file_cache_get_cached.invalid/content").unwrap();
496 let key = CacheKey::from_request(&request);
497
498 let mut response = Response::from_msg(StatusCode::OK, "test content.");
499
500 let policy = CachePolicy::new(&request, &response);
501
502 async_test(async_clmv!(key, {
503 test.set(key.clone(), policy, response.body().await.unwrap()).await;
504 }));
505
506 let test: &'static FileSystemCache = Box::leak(Box::new(FileSystemCache::new(tmp.path().to_owned()).unwrap()));
507
508 let body = async_test(async move {
509 let body = test.body(key).await.unwrap();
510 Response::from_done(StatusCode::OK, HeaderMap::new(), Uri::from_static("/"), Metrics::zero(), body)
511 .body_text()
512 .await
513 .unwrap()
514 });
515
516 assert_eq!(body, "test content.");
517 }
518
519 #[test]
520 pub fn file_cache_get_policy() {
521 test_cache!(test, tmp, "get_etag");
522
523 let request = Request::get("https://get_etag.invalid/content").unwrap();
524 let key = CacheKey::from_request(&request);
525
526 let mut response = Response::from_msg(StatusCode::OK, "test content.");
527 let policy = CachePolicy::new(&request, &response);
528
529 let r_policy = async_test(async_clmv!(policy, {
530 test.set(key.clone(), policy, response.body().await.unwrap()).await;
531
532 let test: &'static FileSystemCache = Box::leak(Box::new(FileSystemCache::new(tmp.path().to_owned()).unwrap()));
533
534 test.policy(key).await.unwrap()
535 }));
536
537 let now = SystemTime::now();
538 assert_eq!(policy.age(now), r_policy.age(now));
539 }
540
541 #[track_caller]
542 fn async_test<F>(test: F) -> F::Output
543 where
544 F: Future,
545 {
546 task::block_on(task::with_deadline(test, 30.secs())).unwrap()
547 }
548}