zng_task/
http.rs

1#![cfg(feature = "http")]
2// suppress nag about very simple boxed closure signatures.
3#![expect(clippy::type_complexity)]
4
5//! HTTP client.
6//!
7//! This module is a thin wrapper around the [`isahc`] crate that just limits the API surface to only
8//! `async` methods without the async suffix. You can convert from/into that [`isahc`] types and this one.
9//!
10//! # Examples
11//!
12//! Get some text:
13//!
14//! ```
15//! # use zng_task as task;
16//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
17//! let text = task::http::get_txt("https://httpbin.org/base64/SGVsbG8gV29ybGQ=").await?;
18//! println!("{text}!");
19//! # Ok(()) }
20//! ```
21//!
22//! [`isahc`]: https://docs.rs/isahc
23
24mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53/// Marker trait for types that try-to-convert to [`Uri`].
54///
55/// All types `T` that match `Uri: TryFrom<T>, <Uri as TryFrom<T>>::Error: Into<isahc::http::Error>` implement this trait.
56#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58    /// Tries to convert `self` into [`Uri`].
59    fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63    Uri: TryFrom<U>,
64    <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66    fn try_uri(self) -> Result<Uri, Error> {
67        Uri::try_from(self).map_err(|e| e.into().into())
68    }
69}
70
71/// Marker trait for types that try-to-convert to [`Method`].
72///
73/// All types `T` that match `Method: TryFrom<T>, <Method as TryFrom<T>>::Error: Into<isahc::http::Error>` implement this trait.
74#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76    /// Tries to convert `self` into [`Method`].
77    fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81    Method: TryFrom<U>,
82    <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84    fn try_method(self) -> Result<Method, Error> {
85        Method::try_from(self).map_err(|e| e.into().into())
86    }
87}
88
89/// Marker trait for types that try-to-convert to [`Body`].
90///
91/// All types `T` that match `isahc::AsyncBody: TryFrom<T>, <isahc::AsyncBody as TryFrom<T>>::Error: Into<isahc::http::Error>`
92/// implement this trait.
93#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95    /// Tries to convert `self` into [`Body`].
96    fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100    isahc::AsyncBody: TryFrom<U>,
101    <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103    fn try_body(self) -> Result<Body, Error> {
104        match isahc::AsyncBody::try_from(self) {
105            Ok(r) => Ok(Body(r)),
106            Err(e) => Err(e.into().into()),
107        }
108    }
109}
110
111/// Marker trait for types that try-to-convert to [`header::HeaderName`].
112///
113/// All types `T` that match `header::HeaderName: TryFrom<T>, <header::HeaderName as TryFrom<T>>::Error: Into<isahc::http::Error>`
114/// implement this trait.
115#[diagnostic::on_unimplemented(
116    note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119    /// Tries to convert `self` into [`Body`].
120    fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124    header::HeaderName: TryFrom<U>,
125    <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127    fn try_header_name(self) -> Result<header::HeaderName, Error> {
128        header::HeaderName::try_from(self).map_err(|e| e.into().into())
129    }
130}
131
132/// Marker trait for types that try-to-convert to [`header::HeaderValue`].
133///
134/// All types `T` that match `header::HeaderValue: TryFrom<T>, <header::HeaderValue as TryFrom<T>>::Error: Into<isahc::http::Error>`
135/// implement this trait.
136#[diagnostic::on_unimplemented(
137    note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140    /// Tries to convert `self` into [`Body`].
141    fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145    header::HeaderValue: TryFrom<U>,
146    <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148    fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149        header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150    }
151}
152
153/// HTTP request.
154///
155/// Use [`send`] to send a request.
156#[derive(Debug)]
157pub struct Request {
158    req: isahc::Request<Body>,
159    limits: ResponseLimits,
160}
161impl Request {
162    /// Starts an empty builder.
163    ///
164    /// # Examples
165    ///
166    /// ```
167    /// use zng_task::http;
168    ///
169    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
170    /// let request = http::Request::builder().method(http::Method::PUT)?.uri("https://httpbin.org/put")?.build();
171    /// # Ok(()) }
172    /// ```
173    ///
174    /// Call [`build`] or [`body`] to finish building the request, note that there are is also an associated function
175    /// to start a builder for each HTTP method and uri.
176    ///
177    /// [`build`]: RequestBuilder::build
178    /// [`body`]: RequestBuilder::body
179    pub fn builder() -> RequestBuilder {
180        RequestBuilder::start(isahc::Request::builder())
181    }
182
183    /// Starts building a GET request.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use zng_task::http;
189    ///
190    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
191    /// let get = http::Request::get("https://httpbin.org/get")?.build();
192    /// # Ok(()) }
193    /// ```
194    pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
195        Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
196    }
197
198    /// Starts building a PUT request.
199    ///
200    /// # Examples
201    ///
202    /// ```
203    /// use zng_task::http;
204    ///
205    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
206    /// let put = http::Request::put("https://httpbin.org/put")?.header("accept", "application/json")?.build();
207    /// # Ok(()) }
208    /// ```
209    pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
210        Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
211    }
212
213    /// Starts building a POST request.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use zng_task::http;
219    ///
220    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
221    /// let post = http::Request::post("https://httpbin.org/post")?.header("accept", "application/json")?.build();
222    /// # Ok(()) }
223    /// ```
224    pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
225        Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
226    }
227
228    /// Starts building a DELETE request.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use zng_task::http;
234    ///
235    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
236    /// let delete = http::Request::delete("https://httpbin.org/delete")?.header("accept", "application/json")?.build();
237    /// # Ok(()) }
238    /// ```
239    pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
240        Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
241    }
242
243    /// Starts building a PATCH request.
244    ///
245    /// # Examples
246    ///
247    /// ```
248    /// use zng_task::http;
249    ///
250    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
251    /// let patch = http::Request::patch("https://httpbin.org/patch")?.header("accept", "application/json")?.build();
252    /// # Ok(()) }
253    /// ```
254    pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
255        Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
256    }
257
258    /// Starts building a HEAD request.
259    ///
260    /// # Examples
261    ///
262    /// ```
263    /// use zng_task::http;
264    ///
265    /// # fn try_example() -> Result<(), Box<dyn std::error::Error>> {
266    /// let head = http::Request::head("https://httpbin.org")?.build();
267    /// # Ok(()) }
268    /// ```
269    pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
270        Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
271    }
272
273    /// Returns a reference to the associated URI.
274    pub fn uri(&self) -> &Uri {
275        self.req.uri()
276    }
277
278    /// Returns a reference to the associated HTTP method.
279    pub fn method(&self) -> &Method {
280        self.req.method()
281    }
282
283    /// Returns a reference to the associated header field map.
284    pub fn headers(&self) -> &header::HeaderMap {
285        self.req.headers()
286    }
287
288    /// Create a clone of the request method, URI, version and headers, with a new `body`.
289    pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
290        let body = body.try_body()?;
291
292        let mut req = isahc::Request::new(body);
293        *req.method_mut() = self.req.method().clone();
294        *req.uri_mut() = self.req.uri().clone();
295        *req.version_mut() = self.req.version();
296        let headers = req.headers_mut();
297        for (name, value) in self.headers() {
298            headers.insert(name.clone(), value.clone());
299        }
300
301        Ok(Self {
302            req,
303            limits: self.limits.clone(),
304        })
305    }
306}
307
308#[derive(Debug, Default, Clone)]
309struct ResponseLimits {
310    max_length: Option<ByteLength>,
311    require_length: bool,
312}
313impl ResponseLimits {
314    fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
315        if self.require_length || self.max_length.is_some() {
316            let response = Response(response);
317            if let Some(len) = response.content_len() {
318                if let Some(max) = self.max_length {
319                    if max < len {
320                        return Err(Error::MaxLength {
321                            content_length: Some(len),
322                            max_length: max,
323                        });
324                    }
325                }
326            } else if self.require_length {
327                return Err(Error::RequireLength);
328            }
329
330            if let Some(max) = self.max_length {
331                let (parts, body) = response.0.into_parts();
332                let response = isahc::Response::from_parts(
333                    parts,
334                    isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
335                        std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
336                    })),
337                );
338
339                Ok(response)
340            } else {
341                Ok(response.0)
342            }
343        } else {
344            Ok(response)
345        }
346    }
347}
348
349/// A [`Request`] builder.
350///
351/// You can use [`Request::builder`] to start an empty builder.
352#[derive(Debug)]
353pub struct RequestBuilder {
354    builder: isahc::http::request::Builder,
355    limits: ResponseLimits,
356}
357impl Default for RequestBuilder {
358    fn default() -> Self {
359        Request::builder()
360    }
361}
362impl RequestBuilder {
363    /// New default request builder.
364    pub fn new() -> Self {
365        Request::builder()
366    }
367
368    fn start(builder: isahc::http::request::Builder) -> Self {
369        Self {
370            builder,
371            limits: ResponseLimits::default(),
372        }
373    }
374
375    /// Set the HTTP method for this request.
376    pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
377        Ok(Self {
378            builder: self.builder.method(method.try_method()?),
379            limits: self.limits,
380        })
381    }
382
383    /// Set the URI for this request.
384    pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
385        Ok(Self {
386            builder: self.builder.uri(uri.try_uri()?),
387            limits: self.limits,
388        })
389    }
390
391    /// Appends a header to this request.
392    pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
393        Ok(Self {
394            builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
395            limits: self.limits,
396        })
397    }
398
399    /// Set a cookie jar to use to accept, store, and supply cookies for incoming responses and outgoing requests.
400    ///
401    /// Note that the [`default_client`] already has a cookie jar.
402    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
403        Self {
404            builder: self.builder.cookie_jar(cookie_jar),
405            limits: self.limits,
406        }
407    }
408
409    /// Specify a maximum amount of time that a complete request/response cycle is allowed to
410    /// take before being aborted. This includes DNS resolution, connecting to the server,
411    /// writing the request, and reading the response.
412    ///
413    /// Note that this includes the response read operation, so if you get a response but don't
414    /// read-it within this timeout you will get a [`TimedOut`] IO error.
415    ///
416    /// By default no timeout is used.
417    ///
418    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
419    pub fn timeout(self, timeout: Duration) -> Self {
420        Self {
421            builder: self.builder.timeout(timeout),
422            limits: self.limits,
423        }
424    }
425
426    /// Set a timeout for establishing connections to a host.
427    ///
428    /// If not set, the [`default_client`] default of 90 seconds will be used.
429    pub fn connect_timeout(self, timeout: Duration) -> Self {
430        Self {
431            builder: self.builder.connect_timeout(timeout),
432            limits: self.limits,
433        }
434    }
435
436    /// Specify a maximum amount of time where transfer rate can go below a minimum speed limit.
437    ///
438    /// The `low_speed` limit is in bytes/s. No low-speed limit is configured by default.
439    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
440        Self {
441            builder: self.builder.low_speed_timeout(low_speed, timeout),
442            limits: self.limits,
443        }
444    }
445
446    /// Set a policy for automatically following server redirects.
447    ///
448    /// If enabled the "Referer" header will be set automatically too.
449    ///
450    /// The [`default_client`] follows up-to 20 redirects.
451    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
452        if !matches!(policy, RedirectPolicy::None) {
453            Self {
454                builder: self.builder.redirect_policy(policy).auto_referer(),
455                limits: self.limits,
456            }
457        } else {
458            Self {
459                builder: self.builder.redirect_policy(policy),
460                limits: self.limits,
461            }
462        }
463    }
464
465    /// Enable or disable automatic decompression of the response body.
466    ///
467    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set using [`header`].
468    ///
469    /// This is enabled by default.
470    ///
471    /// [`header`]: Self::header
472    pub fn auto_decompress(self, enabled: bool) -> Self {
473        Self {
474            builder: self.builder.automatic_decompression(enabled),
475            limits: self.limits,
476        }
477    }
478
479    /// Set a maximum upload speed for the request body, in bytes per second.
480    pub fn max_upload_speed(self, max: u64) -> Self {
481        Self {
482            builder: self.builder.max_upload_speed(max),
483            limits: self.limits,
484        }
485    }
486
487    /// Set a maximum download speed for the response body, in bytes per second.
488    pub fn max_download_speed(self, max: u64) -> Self {
489        Self {
490            builder: self.builder.max_download_speed(max),
491            limits: self.limits,
492        }
493    }
494
495    /// Set the maximum response content length allowed.
496    ///
497    /// If the `Content-Length` is present on the response and it exceeds this limit an error is
498    /// returned immediately, otherwise if [`require_length`] is not enabled an error will be returned
499    /// only when the downloaded body length exceeds the limit.
500    ///
501    /// No limit by default.
502    ///
503    /// [`require_length`]: Self::require_length
504    pub fn max_length(mut self, max: ByteLength) -> Self {
505        self.limits.max_length = Some(max);
506        self
507    }
508
509    /// Set if the `Content-Length` header must be present in the response.
510    pub fn require_length(mut self, require: bool) -> Self {
511        self.limits.require_length = require;
512        self
513    }
514
515    /// Enable or disable metrics collecting.
516    ///
517    /// When enabled you can get the information using the [`Response::metrics`] method.
518    ///
519    /// This is enabled by default.
520    pub fn metrics(self, enable: bool) -> Self {
521        Self {
522            builder: self.builder.metrics(enable),
523            limits: self.limits,
524        }
525    }
526
527    /// Build the request without a body.
528    pub fn build(self) -> Request {
529        self.body(()).unwrap()
530    }
531
532    /// Build the request with a body.
533    pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
534        Ok(Request {
535            req: self.builder.body(body.try_body()?).unwrap(),
536            limits: self.limits,
537        })
538    }
539
540    /// Build the request with more custom build calls in the [inner builder].
541    ///
542    /// [inner builder]: isahc::http::request::Builder
543    pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
544    where
545        F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
546    {
547        let req = custom(self.builder)?;
548        Ok(Request {
549            req: req.map(Body),
550            limits: self.limits,
551        })
552    }
553}
554
555/// Head parts from a split [`Response`].
556pub type ResponseParts = isahc::http::response::Parts;
557
558/// HTTP response.
559#[derive(Debug)]
560pub struct Response(isahc::Response<isahc::AsyncBody>);
561impl Response {
562    /// Returns the [`StatusCode`].
563    pub fn status(&self) -> StatusCode {
564        self.0.status()
565    }
566
567    /// Returns a reference to the associated header field map.
568    pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
569        self.0.headers()
570    }
571
572    /// Decode content-length value if it is present in the headers.
573    pub fn content_len(&self) -> Option<ByteLength> {
574        self.0.body().len().map(|l| ByteLength(l as usize))
575    }
576
577    /// Get the configured cookie jar used for persisting cookies from this response, if any.
578    ///
579    /// Only returns `None` if the [`default_client`] was replaced by one with cookies disabled.
580    pub fn cookie_jar(&self) -> Option<&CookieJar> {
581        self.0.cookie_jar()
582    }
583
584    /// Read the response body as a string.
585    pub async fn text(&mut self) -> std::io::Result<Txt> {
586        self.0.text().await.map(Txt::from)
587    }
588
589    /// Get the effective URI of this response. This value differs from the
590    /// original URI provided when making the request if at least one redirect
591    /// was followed.
592    pub fn effective_uri(&self) -> Option<&Uri> {
593        self.0.effective_uri()
594    }
595
596    /// Read the response body as raw bytes.
597    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
598        Body::bytes_impl(self.0.body_mut()).await
599    }
600
601    /// Read some bytes from the body, returns how many bytes where read.
602    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
603        BufReader::new(self.0.body_mut()).read(buf).await
604    }
605
606    /// Read the from the body to exactly fill the buffer.
607    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
608        BufReader::new(self.0.body_mut()).read_exact(buf).await
609    }
610
611    /// Deserialize the response body as JSON.
612    pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
613    where
614        O: serde::de::DeserializeOwned + std::marker::Unpin,
615    {
616        self.0.json().await
617    }
618
619    /// Metrics for the task transfer.
620    ///
621    /// Metrics are enabled in the [`default_client`] and can be toggled for each request using the
622    /// [`RequestBuilder::metrics`] method. If disabled returns [`Metrics::zero`].
623    pub fn metrics(&self) -> Metrics {
624        self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
625    }
626
627    /// Drop the request without dropping the connection.
628    ///
629    /// This receives and discards any remaining bytes in the response stream. When a response
630    /// is dropped without finishing the connection is discarded so it cannot be reused for connections
631    /// older then HTTP/2.
632    ///
633    /// You should call this method before dropping if you expect the remaining bytes to be consumed quickly and
634    /// don't known that HTTP/2 or newer is being used.
635    pub async fn consume(&mut self) -> std::io::Result<()> {
636        self.0.consume().await
637    }
638
639    /// Create a response with the given status and text body message.
640    pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
641        let status = status.into();
642        let msg = msg.into().into_bytes();
643        let msg = futures_lite::io::Cursor::new(msg);
644        let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
645        *r.status_mut() = status;
646        Self(r)
647    }
648
649    /// New response.
650    pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
651        let mut r = isahc::Response::new(body.0);
652        *r.status_mut() = status;
653        *r.headers_mut() = headers;
654        Self(r)
655    }
656
657    /// Consumes the response returning the head and body parts.
658    pub fn into_parts(self) -> (ResponseParts, Body) {
659        let (p, b) = self.0.into_parts();
660        (p, Body(b))
661    }
662
663    /// New response from given head and body.
664    pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
665        Self(isahc::Response::from_parts(parts, body.0))
666    }
667}
668impl From<Response> for isahc::Response<isahc::AsyncBody> {
669    fn from(r: Response) -> Self {
670        r.0
671    }
672}
673
674/// HTTP request body.
675///
676/// Use [`TryBody`] to convert types to body.
677#[derive(Debug, Default)]
678pub struct Body(isahc::AsyncBody);
679impl Body {
680    /// Create a new empty body.
681    ///
682    /// An empty body represents the *absence* of a body, which is semantically different than the presence of a body of zero length.
683    pub fn empty() -> Body {
684        Body(isahc::AsyncBody::empty())
685    }
686
687    /// Create a new body from a potentially static byte buffer.
688    ///
689    /// The body will have a known length equal to the number of bytes given.
690    ///
691    /// This will try to prevent a copy if the type passed in can be re-used, otherwise the buffer
692    /// will be copied first. This method guarantees to not require a copy for the following types:
693    pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
694        Body(isahc::AsyncBody::from_bytes_static(bytes))
695    }
696
697    /// Create a streaming body of unknown length.
698    pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
699        Body(isahc::AsyncBody::from_reader(read))
700    }
701
702    /// Create a streaming body of with known length.
703    pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
704        Body(isahc::AsyncBody::from_reader_sized(read, size))
705    }
706
707    /// Report if this body is empty.
708    ///
709    /// This is not necessarily the same as checking for zero length, since HTTP message bodies are optional,
710    /// there is a semantic difference between the absence of a body and the presence of a zero-length body.
711    /// This method will only return `true` for the former.
712    pub fn is_empty(&self) -> bool {
713        self.0.is_empty()
714    }
715
716    /// Get the size of the body, if known.
717    pub fn len(&self) -> Option<u64> {
718        self.0.len()
719    }
720
721    /// If this body is repeatable, reset the body stream back to the start of the content.
722    ///
723    /// Returns false if the body cannot be reset.
724    pub fn reset(&mut self) -> bool {
725        self.0.reset()
726    }
727
728    /// Read the body as raw bytes.
729    pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
730        Self::bytes_impl(&mut self.0).await
731    }
732    async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
733        let cap = body.len().unwrap_or(1024);
734        let mut bytes = Vec::with_capacity(cap as usize);
735        super::io::copy(body, &mut bytes).await?;
736        Ok(bytes)
737    }
738
739    /// Read the body and try to convert to UTF-8.
740    ///
741    /// Consider using [`Response::text`], it uses the header encoding information if available.
742    pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
743        let bytes = self.bytes().await?;
744        let r = String::from_utf8(bytes)?;
745        Ok(Txt::from(r))
746    }
747
748    /// Read some bytes from the body, returns how many bytes where read.
749    pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
750        BufReader::new(&mut self.0).read(buf).await
751    }
752
753    /// Read the from the body to exactly fill the buffer.
754    pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
755        BufReader::new(&mut self.0).read_exact(buf).await
756    }
757}
758impl From<Body> for isahc::AsyncBody {
759    fn from(r: Body) -> Self {
760        r.0
761    }
762}
763impl From<isahc::AsyncBody> for Body {
764    fn from(r: isahc::AsyncBody) -> Self {
765        Body(r)
766    }
767}
768impl From<()> for Body {
769    fn from(body: ()) -> Self {
770        Body(body.into())
771    }
772}
773impl From<String> for Body {
774    fn from(body: String) -> Self {
775        Body(body.into())
776    }
777}
778impl From<Txt> for Body {
779    fn from(body: Txt) -> Self {
780        Body(String::from(body).into())
781    }
782}
783impl From<Vec<u8>> for Body {
784    fn from(body: Vec<u8>) -> Self {
785        Body(body.into())
786    }
787}
788impl From<&'_ [u8]> for Body {
789    fn from(body: &[u8]) -> Self {
790        body.to_vec().into()
791    }
792}
793impl From<&'_ str> for Body {
794    fn from(body: &str) -> Self {
795        body.as_bytes().into()
796    }
797}
798impl<T: Into<Self>> From<Option<T>> for Body {
799    fn from(body: Option<T>) -> Self {
800        match body {
801            Some(body) => body.into(),
802            None => Self::empty(),
803        }
804    }
805}
806impl AsyncRead for Body {
807    fn poll_read(
808        self: std::pin::Pin<&mut Self>,
809        cx: &mut std::task::Context<'_>,
810        buf: &mut [u8],
811    ) -> std::task::Poll<std::io::Result<usize>> {
812        Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
813    }
814}
815
816/// Send a GET request to the `uri`.
817///
818/// The [`default_client`] is used to send the request.
819pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
820    default_client().get(uri).await
821}
822
823/// Send a GET request to the `uri` and read the response as a string.
824///
825/// The [`default_client`] is used to send the request.
826pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
827    default_client().get_txt(uri).await
828}
829
830/// Send a GET request to the `uri` and read the response as raw bytes.
831///
832/// The [`default_client`] is used to send the request.
833pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
834    default_client().get_bytes(uri).await
835}
836
837/// Send a GET request to the `uri` and de-serializes the response.
838///
839/// The [`default_client`] is used to send the request.
840pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
841where
842    O: serde::de::DeserializeOwned + std::marker::Unpin,
843{
844    default_client().get_json(uri).await
845}
846
847/// Send a HEAD request to the `uri`.
848///
849/// The [`default_client`] is used to send the request.
850pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
851    default_client().head(uri).await
852}
853
854/// Send a PUT request to the `uri` with a given request body.
855///
856/// The [`default_client`] is used to send the request.
857pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
858    default_client().put(uri, body).await
859}
860
861/// Send a POST request to the `uri` with a given request body.
862///
863/// The [`default_client`] is used to send the request.
864pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
865    default_client().post(uri, body).await
866}
867
868/// Send a DELETE request to the `uri`.
869///
870/// The [`default_client`] is used to send the request.
871pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
872    default_client().delete(uri).await
873}
874
875/// Send a custom [`Request`].
876///
877/// The [`default_client`] is used to send the request.
878pub async fn send(request: Request) -> Result<Response, Error> {
879    default_client().send(request).await
880}
881
882/// The [`Client`] used by the functions in this module.
883///
884/// You can replace the default client at the start of the process using [`set_default_client_init`].
885///
886/// # Defaults
887///
888/// The default client is created using [`Client::new`].
889///
890/// [`isahc`]: https://docs.rs/isahc
891pub fn default_client() -> &'static Client {
892    use once_cell::sync::Lazy;
893
894    static SHARED: Lazy<Client> = Lazy::new(|| {
895        let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
896        if let ClientInit::Set(init) = ci {
897            init()
898        } else {
899            // browser defaults
900            Client::new()
901        }
902    });
903    &SHARED
904}
905
906static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
907
908enum ClientInit {
909    None,
910    Set(Box<dyn FnOnce() -> Client + Send>),
911    Inited,
912}
913
914/// Set a custom initialization function for the [`default_client`].
915///
916/// The [`default_client`] is used by all functions in this module and is initialized on the first usage,
917/// you can use this function before any HTTP operation to replace the [`isahc`] client.
918///
919/// Returns an error if the [`default_client`] was already initialized.
920///
921/// [`isahc`]: https://docs.rs/isahc
922pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
923where
924    I: FnOnce() -> Client + Send + 'static,
925{
926    let mut ci = CLIENT_INIT.lock();
927    if let ClientInit::Inited = &*ci {
928        Err(DefaultAlreadyInitedError)
929    } else {
930        *ci = ClientInit::Set(Box::new(init));
931        Ok(())
932    }
933}
934
935/// Error returned by [`set_default_client_init`] if the default was already initialized.
936#[derive(Debug, Clone, Copy)]
937pub struct DefaultAlreadyInitedError;
938impl fmt::Display for DefaultAlreadyInitedError {
939    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
940        write!(f, "default client already initialized, can only set before first use")
941    }
942}
943impl std::error::Error for DefaultAlreadyInitedError {}
944
945/// Information about the state of an HTTP request.
946#[derive(Debug, Clone, PartialEq, Eq)]
947pub struct Metrics {
948    /// Number of bytes uploaded / estimated total.
949    pub upload_progress: (ByteLength, ByteLength),
950
951    /// Average upload speed so far in bytes/second.
952    pub upload_speed: ByteLength,
953
954    /// Number of bytes downloaded / estimated total.
955    pub download_progress: (ByteLength, ByteLength),
956
957    /// Average download speed so far in bytes/second.
958    pub download_speed: ByteLength,
959
960    /// Total time from the start of the request until DNS name resolving was completed.
961    ///
962    /// When a redirect is followed, the time from each request is added together.
963    pub name_lookup_time: Duration,
964
965    /// Amount of time taken to establish a connection to the server (not including TLS connection time).
966    ///
967    /// When a redirect is followed, the time from each request is added together.
968    pub connect_time: Duration,
969
970    /// Amount of time spent on TLS handshakes.
971    ///
972    /// When a redirect is followed, the time from each request is added together.
973    pub secure_connect_time: Duration,
974
975    /// Time it took from the start of the request until the first byte is either sent or received.
976    ///
977    /// When a redirect is followed, the time from each request is added together.
978    pub transfer_start_time: Duration,
979
980    /// Amount of time spent performing the actual request transfer. The “transfer” includes
981    /// both sending the request and receiving the response.
982    ///
983    /// When a redirect is followed, the time from each request is added together.
984    pub transfer_time: Duration,
985
986    /// Total time for the entire request. This will continuously increase until the entire
987    /// response body is consumed and completed.
988    ///
989    /// When a redirect is followed, the time from each request is added together.
990    pub total_time: Duration,
991
992    /// If automatic redirect following is enabled, the total time taken for all redirection steps
993    /// including name lookup, connect, pre-transfer and transfer before final transaction was started.
994    pub redirect_time: Duration,
995}
996impl Metrics {
997    /// Init from `isahc::Metrics`.
998    pub fn from_isahc(m: &isahc::Metrics) -> Self {
999        Self {
1000            upload_progress: {
1001                let (c, t) = m.upload_progress();
1002                ((c as usize).bytes(), (t as usize).bytes())
1003            },
1004            upload_speed: (m.upload_speed().round() as usize).bytes(),
1005            download_progress: {
1006                let (c, t) = m.download_progress();
1007                ((c as usize).bytes(), (t as usize).bytes())
1008            },
1009            download_speed: (m.download_speed().round() as usize).bytes(),
1010            name_lookup_time: m.name_lookup_time(),
1011            connect_time: m.connect_time(),
1012            secure_connect_time: m.secure_connect_time(),
1013            transfer_start_time: m.transfer_start_time(),
1014            transfer_time: m.transfer_time(),
1015            total_time: m.total_time(),
1016            redirect_time: m.redirect_time(),
1017        }
1018    }
1019
1020    /// All zeros.
1021    pub fn zero() -> Self {
1022        Self {
1023            upload_progress: (0.bytes(), 0.bytes()),
1024            upload_speed: 0.bytes(),
1025            download_progress: (0.bytes(), 0.bytes()),
1026            download_speed: 0.bytes(),
1027            name_lookup_time: Duration::ZERO,
1028            connect_time: Duration::ZERO,
1029            secure_connect_time: Duration::ZERO,
1030            transfer_start_time: Duration::ZERO,
1031            transfer_time: Duration::ZERO,
1032            total_time: Duration::ZERO,
1033            redirect_time: Duration::ZERO,
1034        }
1035    }
1036}
1037impl From<isahc::Metrics> for Metrics {
1038    fn from(m: isahc::Metrics) -> Self {
1039        Metrics::from_isahc(&m)
1040    }
1041}
1042impl fmt::Display for Metrics {
1043    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044        let mut ws = false; // written something
1045
1046        if self.upload_progress.0 != self.upload_progress.1 {
1047            write!(
1048                f,
1049                "↑ {} - {}, {}/s",
1050                self.upload_progress.0, self.upload_progress.1, self.upload_speed
1051            )?;
1052            ws = true;
1053        }
1054        if self.download_progress.0 != self.download_progress.1 {
1055            write!(
1056                f,
1057                "{}↓ {} - {}, {}/s",
1058                if ws { "\n" } else { "" },
1059                self.download_progress.0,
1060                self.download_progress.1,
1061                self.download_speed
1062            )?;
1063            ws = true;
1064        }
1065
1066        if !ws {
1067            if self.upload_progress.1.bytes() > 0 {
1068                write!(f, "↑ {}", self.upload_progress.1)?;
1069                ws = true;
1070            }
1071            if self.download_progress.1.bytes() > 0 {
1072                write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1073                ws = true;
1074            }
1075
1076            if ws {
1077                write!(f, "\n{:?}", self.total_time)?;
1078            }
1079        }
1080
1081        Ok(())
1082    }
1083}
1084impl_from_and_into_var! {
1085    fn from(metrics: Metrics) -> Progress {
1086        let mut status = Progress::indeterminate();
1087        if metrics.download_progress.1 > 0.bytes() {
1088            status = Progress::from_n_of(metrics.download_progress.0 .0, metrics.download_progress.1 .0);
1089        }
1090        if metrics.upload_progress.1 > 0.bytes() {
1091            let u_status = Progress::from_n_of(metrics.upload_progress.0 .0, metrics.upload_progress.1 .0);
1092            if status.is_indeterminate() {
1093                status = u_status;
1094            } else {
1095                status = status.and_fct(u_status.fct());
1096            }
1097        }
1098        status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1099            m.set(*METRICS_ID, metrics);
1100        })
1101    }
1102}
1103zng_state_map::static_id! {
1104    /// Metrics in a [`Progress::with_meta`] metadata.
1105    pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1106}
1107
1108/// HTTP client.
1109///
1110/// An HTTP client acts as a session for executing one of more HTTP requests.
1111pub struct Client {
1112    client: isahc::HttpClient,
1113    cache: Option<Box<dyn CacheDb>>,
1114    cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1115}
1116impl Default for Client {
1117    fn default() -> Self {
1118        Self::new()
1119    }
1120}
1121impl Clone for Client {
1122    fn clone(&self) -> Self {
1123        Client {
1124            client: self.client.clone(),
1125            cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1126            cache_mode: self.cache_mode.clone(),
1127        }
1128    }
1129}
1130impl fmt::Debug for Client {
1131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132        f.debug_struct("Client").finish_non_exhaustive()
1133    }
1134}
1135impl Client {
1136    /// New client with default config.
1137    ///
1138    /// This enables cookies, sets the `redirect_policy` with a limit of up-to 20 redirects and `auto_referer`, sets
1139    /// the `connect_timeout` to 90 seconds and enables `metrics`.
1140    pub fn new() -> Self {
1141        Client::builder()
1142            .cookies()
1143            .redirect_policy(RedirectPolicy::Limit(20))
1144            .connect_timeout(90.secs())
1145            .metrics(true)
1146            .build()
1147    }
1148
1149    /// Start a new [`ClientBuilder`] for creating a custom client.
1150    pub fn builder() -> ClientBuilder {
1151        ClientBuilder {
1152            builder: isahc::HttpClient::builder(),
1153            cache: None,
1154            cache_mode: None,
1155        }
1156    }
1157
1158    /// Gets the configured cookie-jar for this client, if cookies are enabled.
1159    pub fn cookie_jar(&self) -> Option<&CookieJar> {
1160        self.client.cookie_jar()
1161    }
1162
1163    /// Send a GET request to the `uri`.
1164    pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1165        self.send(Request::get(uri)?.build()).await
1166    }
1167
1168    /// Send a GET request to the `uri` and read the response as a string.
1169    pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1170        let mut r = self.get(uri).await?;
1171        let r = r.text().await?;
1172        Ok(r)
1173    }
1174
1175    /// Send a GET request to the `uri` and read the response as raw bytes.
1176    pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1177        let mut r = self.get(uri).await?;
1178        let r = r.bytes().await?;
1179        Ok(r)
1180    }
1181
1182    /// Send a GET request to the `uri` and de-serializes the response.
1183    pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1184    where
1185        O: serde::de::DeserializeOwned + std::marker::Unpin,
1186    {
1187        let mut r = self.get(uri).await?;
1188        let r = r.json::<O>().await?;
1189        Ok(r)
1190    }
1191
1192    /// Send a HEAD request to the `uri`.
1193    pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1194        self.send(Request::head(uri)?.build()).await
1195    }
1196    /// Send a PUT request to the `uri` with a given request body.
1197    pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1198        self.send(Request::put(uri)?.body(body)?).await
1199    }
1200
1201    /// Send a POST request to the `uri` with a given request body.
1202    pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1203        self.send(Request::post(uri)?.body(body)?).await
1204    }
1205
1206    /// Send a DELETE request to the `uri`.
1207    pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1208        self.send(Request::delete(uri)?.build()).await
1209    }
1210
1211    /// Send a custom [`Request`].
1212    ///
1213    /// # Cache
1214    ///
1215    /// If the client has a [`cache`] and the request uses the `GET` method the result will be cached
1216    /// according with the [`cache_mode`] selected for the request.
1217    ///
1218    /// [`cache`]: Self::cache
1219    /// [`cache_mode`]: Self::cache_mode
1220    pub async fn send(&self, request: Request) -> Result<Response, Error> {
1221        if let Some(db) = &self.cache {
1222            match self.cache_mode(&request) {
1223                CacheMode::NoCache => {
1224                    let response = self.client.send_async(request.req).await?;
1225                    let response = request.limits.check(response)?;
1226                    Ok(Response(response))
1227                }
1228                CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1229                CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1230                CacheMode::Error(e) => Err(e),
1231            }
1232        } else {
1233            let response = self.client.send_async(request.req).await?;
1234            let response = request.limits.check(response)?;
1235            Ok(Response(response))
1236        }
1237    }
1238
1239    #[async_recursion::async_recursion]
1240    async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1241        if retry_count == 3 {
1242            tracing::error!("retried cache 3 times, skipping cache");
1243            let response = self.client.send_async(request.req).await?;
1244            let response = request.limits.check(response)?;
1245            return Ok(Response(response));
1246        }
1247
1248        let key = CacheKey::new(&request.req);
1249        if let Some(policy) = db.policy(&key).await {
1250            match policy.before_request(&request.req) {
1251                BeforeRequest::Fresh(parts) => {
1252                    if let Some(body) = db.body(&key).await {
1253                        let response = isahc::Response::from_parts(parts, body.0);
1254                        let response = request.limits.check(response)?;
1255
1256                        Ok(Response(response))
1257                    } else {
1258                        tracing::error!("cache returned policy but not body");
1259                        db.remove(&key).await;
1260                        self.send_cache_default(db, request, retry_count + 1).await
1261                    }
1262                }
1263                BeforeRequest::Stale { request: parts, matches } => {
1264                    if matches {
1265                        let (_, body) = request.req.into_parts();
1266                        let request = Request {
1267                            req: isahc::Request::from_parts(parts, body),
1268                            limits: request.limits,
1269                        };
1270                        let policy_request = request.clone_with(()).unwrap().req;
1271                        let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1272
1273                        let response = self.client.send_async(request.req).await?;
1274                        let response = request.limits.check(response)?;
1275
1276                        match policy.after_response(&policy_request, &response) {
1277                            AfterResponse::NotModified(policy, parts) => {
1278                                if let Some(body) = db.body(&key).await {
1279                                    let response = isahc::Response::from_parts(parts, body.0);
1280
1281                                    db.set_policy(&key, policy).await;
1282
1283                                    Ok(Response(response))
1284                                } else {
1285                                    tracing::error!("cache returned policy but not body");
1286                                    db.remove(&key).await;
1287
1288                                    if no_req_body {
1289                                        self.send_cache_default(
1290                                            db,
1291                                            Request {
1292                                                req: policy_request,
1293                                                limits: request.limits,
1294                                            },
1295                                            retry_count + 1,
1296                                        )
1297                                        .await
1298                                    } else {
1299                                        Err(std::io::Error::new(
1300                                            std::io::ErrorKind::NotFound,
1301                                            "cache returned policy but not body, cannot auto-retry",
1302                                        )
1303                                        .into())
1304                                    }
1305                                }
1306                            }
1307                            AfterResponse::Modified(policy, parts) => {
1308                                if policy.should_store() {
1309                                    let (_, body) = response.into_parts();
1310                                    if let Some(body) = db.set(&key, policy, Body(body)).await {
1311                                        let response = isahc::Response::from_parts(parts, body.0);
1312
1313                                        Ok(Response(response))
1314                                    } else {
1315                                        tracing::error!("cache db failed to store body");
1316                                        db.remove(&key).await;
1317
1318                                        if no_req_body {
1319                                            self.send_cache_default(
1320                                                db,
1321                                                Request {
1322                                                    req: policy_request,
1323                                                    limits: request.limits,
1324                                                },
1325                                                retry_count + 1,
1326                                            )
1327                                            .await
1328                                        } else {
1329                                            Err(std::io::Error::new(
1330                                                std::io::ErrorKind::NotFound,
1331                                                "cache db failed to store body, cannot auto-retry",
1332                                            )
1333                                            .into())
1334                                        }
1335                                    }
1336                                } else {
1337                                    db.remove(&key).await;
1338
1339                                    Ok(Response(response))
1340                                }
1341                            }
1342                        }
1343                    } else {
1344                        tracing::error!("cache policy did not match request, {request:?}");
1345                        db.remove(&key).await;
1346                        let response = self.client.send_async(request.req).await?;
1347                        let response = request.limits.check(response)?;
1348                        Ok(Response(response))
1349                    }
1350                }
1351            }
1352        } else {
1353            let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1354            let policy_request = request.clone_with(()).unwrap().req;
1355
1356            let response = self.client.send_async(request.req).await?;
1357            let response = request.limits.check(response)?;
1358
1359            let policy = CachePolicy::new(&policy_request, &response);
1360
1361            if policy.should_store() {
1362                let (parts, body) = response.into_parts();
1363
1364                if let Some(body) = db.set(&key, policy, Body(body)).await {
1365                    let response = isahc::Response::from_parts(parts, body.0);
1366
1367                    Ok(Response(response))
1368                } else {
1369                    tracing::error!("cache db failed to store body");
1370                    db.remove(&key).await;
1371
1372                    if no_req_body {
1373                        self.send_cache_default(
1374                            db,
1375                            Request {
1376                                req: policy_request,
1377                                limits: request.limits,
1378                            },
1379                            retry_count + 1,
1380                        )
1381                        .await
1382                    } else {
1383                        Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1384                    }
1385                }
1386            } else {
1387                Ok(Response(response))
1388            }
1389        }
1390    }
1391
1392    #[async_recursion::async_recursion]
1393    async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1394        if retry_count == 3 {
1395            tracing::error!("retried cache 3 times, skipping cache");
1396            let response = self.client.send_async(request.req).await?;
1397            let response = request.limits.check(response)?;
1398            return Ok(Response(response));
1399        }
1400
1401        let key = CacheKey::new(&request.req);
1402        if let Some(policy) = db.policy(&key).await {
1403            if let Some(body) = db.body(&key).await {
1404                match policy.before_request(&request.req) {
1405                    BeforeRequest::Fresh(p) => {
1406                        let response = isahc::Response::from_parts(p, body.0);
1407                        let response = request.limits.check(response)?;
1408
1409                        if !policy.is_permanent() {
1410                            db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1411                        }
1412
1413                        Ok(Response(response))
1414                    }
1415                    BeforeRequest::Stale { request: parts, .. } => {
1416                        // policy was not permanent when cached
1417
1418                        let limits = request.limits.clone();
1419
1420                        let (_, req_body) = request.req.into_parts();
1421                        let request = isahc::Request::from_parts(parts, req_body);
1422
1423                        let response = self.client.send_async(request).await?;
1424                        let response = limits.check(response)?;
1425
1426                        let (parts, _) = response.into_parts();
1427
1428                        let response = isahc::Response::from_parts(parts, body.0);
1429
1430                        db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1431
1432                        Ok(Response(response))
1433                    }
1434                }
1435            } else {
1436                tracing::error!("cache returned policy but not body");
1437                db.remove(&key).await;
1438                self.send_cache_permanent(db, request, retry_count + 1).await
1439            }
1440        } else {
1441            let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1442                Some(request.clone_with(()).unwrap())
1443            } else {
1444                None
1445            };
1446
1447            let response = self.client.send_async(request.req).await?;
1448            let response = request.limits.check(response)?;
1449            let policy = CachePolicy::new_permanent(&response);
1450
1451            let (parts, body) = response.into_parts();
1452
1453            if let Some(body) = db.set(&key, policy, Body(body)).await {
1454                let response = isahc::Response::from_parts(parts, body.0);
1455                Ok(Response(response))
1456            } else {
1457                tracing::error!("cache db failed to store body");
1458                db.remove(&key).await;
1459
1460                if let Some(request) = backup_request {
1461                    self.send_cache_permanent(db, request, retry_count + 1).await
1462                } else {
1463                    Err(std::io::Error::new(
1464                        std::io::ErrorKind::NotFound,
1465                        "cache db failed to store permanent body, cannot auto-retry",
1466                    )
1467                    .into())
1468                }
1469            }
1470        }
1471    }
1472
1473    /// Reference the cache used in this client.
1474    pub fn cache(&self) -> Option<&dyn CacheDb> {
1475        self.cache.as_deref()
1476    }
1477
1478    /// Returns the [`CacheMode`] that is used in this client if the request is made.
1479    pub fn cache_mode(&self, request: &Request) -> CacheMode {
1480        if self.cache.is_none() || request.method() != Method::GET {
1481            CacheMode::NoCache
1482        } else {
1483            (self.cache_mode)(request)
1484        }
1485    }
1486}
1487impl From<Client> for isahc::HttpClient {
1488    fn from(c: Client) -> Self {
1489        c.client
1490    }
1491}
1492impl From<isahc::HttpClient> for Client {
1493    fn from(client: isahc::HttpClient) -> Self {
1494        Self {
1495            client,
1496            cache: None,
1497            cache_mode: Arc::new(|_| CacheMode::default()),
1498        }
1499    }
1500}
1501
1502/// Builder that can be used to create a [`Client`].
1503///
1504/// Use [`Client::builder`] to start building.
1505///
1506/// # Examples
1507///
1508/// ```
1509/// use zng_task::http::*;
1510///
1511/// let client = Client::builder().metrics(true).build();
1512/// ```
1513pub struct ClientBuilder {
1514    builder: isahc::HttpClientBuilder,
1515    cache: Option<Box<dyn CacheDb>>,
1516    cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1517}
1518impl Default for ClientBuilder {
1519    fn default() -> Self {
1520        Client::builder()
1521    }
1522}
1523impl ClientBuilder {
1524    /// New default builder.
1525    pub fn new() -> Self {
1526        Client::builder()
1527    }
1528
1529    /// Build the [`Client`] using the configured options.
1530    pub fn build(self) -> Client {
1531        Client {
1532            client: self.builder.build().unwrap(),
1533            cache: self.cache,
1534            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1535        }
1536    }
1537
1538    /// Build the client with more custom build calls in the [inner builder].
1539    ///
1540    /// [inner builder]: isahc::HttpClientBuilder
1541    pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1542    where
1543        F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1544    {
1545        custom(self.builder).map(|c| Client {
1546            client: c,
1547            cache: self.cache,
1548            cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1549        })
1550    }
1551
1552    /// Add a default header to be passed with every request.
1553    pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1554        Ok(Self {
1555            builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1556            cache: self.cache,
1557            cache_mode: self.cache_mode,
1558        })
1559    }
1560
1561    /// Enable persistent cookie handling for all requests using this client using a shared cookie jar.
1562    pub fn cookies(self) -> Self {
1563        Self {
1564            builder: self.builder.cookies(),
1565            cache: self.cache,
1566            cache_mode: self.cache_mode,
1567        }
1568    }
1569
1570    /// Set a cookie jar to use to accept, store, and supply cookies for incoming responses and outgoing requests.
1571    ///
1572    /// Note that the [`default_client`] already has a cookie jar.
1573    pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1574        Self {
1575            builder: self.builder.cookie_jar(cookie_jar),
1576            cache: self.cache,
1577            cache_mode: self.cache_mode,
1578        }
1579    }
1580
1581    /// Specify a maximum amount of time that a complete request/response cycle is allowed to
1582    /// take before being aborted. This includes DNS resolution, connecting to the server,
1583    /// writing the request, and reading the response.
1584    ///
1585    /// Note that this includes the response read operation, so if you get a response but don't
1586    /// read-it within this timeout you will get a [`TimedOut`] IO error.
1587    ///
1588    /// By default no timeout is used.
1589    ///
1590    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
1591    pub fn timeout(self, timeout: Duration) -> Self {
1592        Self {
1593            builder: self.builder.timeout(timeout),
1594            cache: self.cache,
1595            cache_mode: self.cache_mode,
1596        }
1597    }
1598
1599    /// Set a timeout for establishing connections to a host.
1600    ///
1601    /// If not set, the [`default_client`] default of 90 seconds will be used.
1602    pub fn connect_timeout(self, timeout: Duration) -> Self {
1603        Self {
1604            builder: self.builder.connect_timeout(timeout),
1605            cache: self.cache,
1606            cache_mode: self.cache_mode,
1607        }
1608    }
1609
1610    /// Specify a maximum amount of time where transfer rate can go below a minimum speed limit.
1611    ///
1612    /// The `low_speed` limit is in bytes/s. No low-speed limit is configured by default.
1613    pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1614        Self {
1615            builder: self.builder.low_speed_timeout(low_speed, timeout),
1616            cache: self.cache,
1617            cache_mode: self.cache_mode,
1618        }
1619    }
1620
1621    /// Set a policy for automatically following server redirects.
1622    ///
1623    /// If enabled the "Referer" header will be set automatically too.
1624    pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1625        if !matches!(policy, RedirectPolicy::None) {
1626            Self {
1627                builder: self.builder.redirect_policy(policy).auto_referer(),
1628                cache: self.cache,
1629                cache_mode: self.cache_mode,
1630            }
1631        } else {
1632            Self {
1633                builder: self.builder.redirect_policy(policy),
1634                cache: self.cache,
1635                cache_mode: self.cache_mode,
1636            }
1637        }
1638    }
1639
1640    /// Enable or disable automatic decompression of the response body.
1641    ///
1642    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set using [`default_header`].
1643    ///
1644    /// This is enabled by default.
1645    ///
1646    /// [`default_header`]: Self::default_header
1647    pub fn auto_decompress(self, enabled: bool) -> Self {
1648        Self {
1649            builder: self.builder.automatic_decompression(enabled),
1650            cache: self.cache,
1651            cache_mode: self.cache_mode,
1652        }
1653    }
1654
1655    /// Set a maximum upload speed for the request body, in bytes per second.
1656    pub fn max_upload_speed(self, max: u64) -> Self {
1657        Self {
1658            builder: self.builder.max_upload_speed(max),
1659            cache: self.cache,
1660            cache_mode: self.cache_mode,
1661        }
1662    }
1663
1664    /// Set a maximum download speed for the response body, in bytes per second.
1665    pub fn max_download_speed(self, max: u64) -> Self {
1666        Self {
1667            builder: self.builder.max_download_speed(max),
1668            cache: self.cache,
1669            cache_mode: self.cache_mode,
1670        }
1671    }
1672
1673    /// Enable or disable metrics collecting.
1674    ///
1675    /// When enabled you can get the information using the [`Response::metrics`] method.
1676    ///
1677    /// This is enabled by default.
1678    pub fn metrics(self, enable: bool) -> Self {
1679        Self {
1680            builder: self.builder.metrics(enable),
1681            cache: self.cache,
1682            cache_mode: self.cache_mode,
1683        }
1684    }
1685
1686    /// Sets the [`CacheDb`] to use.
1687    ///
1688    /// Caching is only enabled if there is a DB, no caching is done by default.
1689    pub fn cache(self, cache: impl CacheDb) -> Self {
1690        Self {
1691            builder: self.builder,
1692            cache: Some(Box::new(cache)),
1693            cache_mode: self.cache_mode,
1694        }
1695    }
1696
1697    /// Sets the [`CacheMode`] selector.
1698    ///
1699    /// The `selector` closure is called for every cacheable request before it is made, it
1700    /// must return a [`CacheMode`] value that configures how the [`cache`] is used.
1701    ///
1702    /// Note that the closure is only called if a [`cache`] is set.
1703    ///
1704    /// [`cache`]: Self::cache
1705    pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1706        Self {
1707            builder: self.builder,
1708            cache: self.cache,
1709            cache_mode: Some(Arc::new(selector)),
1710        }
1711    }
1712}
1713
1714/// An error encountered while sending an HTTP request or receiving an HTTP response using a [`Client`].
1715#[derive(Debug, Clone)]
1716#[non_exhaustive]
1717pub enum Error {
1718    /// Error from the HTTP client.
1719    Client(isahc::Error),
1720    /// Error when [`max_length`] validation fails at the header or after streaming download.
1721    ///
1722    /// [`max_length`]: RequestBuilder::max_length
1723    MaxLength {
1724        /// The `Content-Length` header value, if it was set.
1725        content_length: Option<ByteLength>,
1726        /// The maximum allowed length.
1727        max_length: ByteLength,
1728    },
1729    /// Error when [`require_length`] is set, but a response was sent without the `Content-Length` header.
1730    ///
1731    /// [`require_length`]: RequestBuilder::require_length
1732    RequireLength,
1733}
1734impl StdError for Error {
1735    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1736        match self {
1737            Error::Client(e) => Some(e),
1738            _ => None,
1739        }
1740    }
1741}
1742impl From<isahc::Error> for Error {
1743    fn from(e: isahc::Error) -> Self {
1744        if let Some(e) = e
1745            .source()
1746            .and_then(|e| e.downcast_ref::<std::io::Error>())
1747            .and_then(|e| e.get_ref())
1748        {
1749            if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1750                return Error::MaxLength {
1751                    content_length: e.0,
1752                    max_length: e.1,
1753                };
1754            }
1755            if e.downcast_ref::<RequireLengthError>().is_some() {
1756                return Error::RequireLength;
1757            }
1758        }
1759        Error::Client(e)
1760    }
1761}
1762impl From<isahc::http::Error> for Error {
1763    fn from(e: isahc::http::Error) -> Self {
1764        isahc::Error::from(e).into()
1765    }
1766}
1767impl From<std::io::Error> for Error {
1768    fn from(e: std::io::Error) -> Self {
1769        isahc::Error::from(e).into()
1770    }
1771}
1772impl fmt::Display for Error {
1773    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1774        match self {
1775            Error::Client(e) => write!(f, "{e}"),
1776            Error::MaxLength {
1777                content_length,
1778                max_length,
1779            } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1780            Error::RequireLength => write!(f, "{RequireLengthError}"),
1781        }
1782    }
1783}
1784
1785// Error types smuggled inside an io::Error inside the isahc::Error.
1786
1787#[derive(Debug)]
1788struct MaxLengthError(Option<ByteLength>, ByteLength);
1789impl fmt::Display for MaxLengthError {
1790    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1791        if let Some(l) = self.0 {
1792            write!(f, "content-length of {l} exceeds limit of {}", self.1)
1793        } else {
1794            write!(f, "download reached limit of {}", self.1)
1795        }
1796    }
1797}
1798impl StdError for MaxLengthError {}
1799
1800#[derive(Debug)]
1801struct RequireLengthError;
1802impl fmt::Display for RequireLengthError {
1803    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1804        write!(f, "content-length is required")
1805    }
1806}
1807impl StdError for RequireLengthError {}