zng_task/
http.rs

1#![cfg(feature = "http")]
2
3//! HTTP client.
4//!
5//! This module provides an HTTP client API that is backend agnostic. By default it uses the system `curl` command
6//! line utility with a simple cache, this can be replaced by implementing [`HttpClient`] and [`HttpCache`].
7//!
8
9mod cache;
10mod ctx;
11mod curl;
12mod file_cache;
13mod util;
14
15pub use cache::{CacheKey, CacheMode, CachePolicy};
16pub use ctx::{HttpCache, HttpClient, http_cache, http_client, set_http_cache, set_http_client, set_request_default};
17pub use curl::CurlProcessClient;
18pub use file_cache::FileSystemCache;
19
20/// Any error during request or response.
21pub type Error = Box<dyn std::error::Error + Send + Sync>;
22
23pub use http::{
24    StatusCode, header,
25    method::{self, Method},
26    uri::{self, Uri},
27};
28use serde::{Deserialize, Serialize};
29use zng_var::{Var, const_var};
30
31use std::time::Duration;
32use std::{fmt, mem};
33
34use crate::{channel::IpcBytes, http::ctx::REQUEST_DEFAULT, io::Metrics};
35
36use super::io::AsyncRead;
37
38use zng_txt::{ToTxt, Txt};
39use zng_unit::*;
40
41/// HTTP request.
42///
43/// Use [`send`] to send a request.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45#[non_exhaustive]
46pub struct Request {
47    /// The URI.
48    #[serde(with = "http_serde::uri")]
49    pub uri: Uri,
50    /// The HTTP method.
51    #[serde(with = "http_serde::method")]
52    pub method: Method,
53
54    /// Header values.
55    ///
56    /// Is empty by default.
57    #[serde(with = "http_serde::header_map")]
58    pub headers: http::HeaderMap,
59
60    /// Maximum amount of time that a complete request/response cycle is allowed to
61    /// take before being aborted. This includes DNS resolution, connecting to the server,
62    /// writing the request, and reading the response.
63    ///
64    /// Note that this includes the response read operation, so if you get a response but don't
65    /// read-it within this timeout you will get a [`TimedOut`] IO error.
66    ///
67    /// By default no timeout is used, [`Duration::MAX`].
68    ///
69    /// [`TimedOut`]: https://doc.rust-lang.org/nightly/std/io/enum.ErrorKind.html#variant.TimedOut
70    pub timeout: Duration,
71
72    /// Maximum amount of time to await for establishing connections to a host.
73    ///
74    /// Is 90 seconds by default.
75    pub connect_timeout: Duration,
76
77    /// Maximum amount of time allowed when transfer speed is under the given speed in bytes per second.
78    ///
79    /// By default not timeout is used, `(Duration::MAX, 0)`.
80    pub low_speed_timeout: (Duration, ByteLength),
81
82    /// Maximum redirects to follow.
83    ///
84    /// When redirecting the `Referer` header is updated automatically.
85    ///
86    /// Is `20` by default.
87    pub redirect_limit: u16,
88
89    /// If should auto decompress received data.
90    ///
91    /// If enabled the "Accept-Encoding" will also be set automatically, if it was not set on the header.
92    ///
93    /// This is enabled by default.
94    #[cfg(feature = "http_compression")]
95    pub auto_decompress: bool,
96
97    /// Maximum upload speed in bytes per second.
98    ///
99    /// No maximum by default, [`ByteLength::MAX`].
100    pub max_upload_speed: ByteLength,
101
102    /// Maximum download speed in bytes per second.
103    ///
104    /// No maximum by default, [`ByteLength::MAX`].
105    pub max_download_speed: ByteLength,
106
107    /// If the `Content-Length` header must be present in the response.
108    ///
109    /// By default this is not required.
110    pub require_length: bool,
111
112    /// Set the maximum response content length allowed.
113    ///
114    /// If the `Content-Length` is present on the response and it exceeds this limit an error is
115    /// returned immediately, otherwise if [`require_length`] is not enabled an error will be returned
116    /// only when the downloaded body length exceeds the limit.
117    ///
118    /// By default no limit is set, [`ByteLength::MAX`].
119    ///
120    /// [`require_length`]: Request::require_length
121    pub max_length: ByteLength,
122
123    /// Response cache mode.
124    ///
125    /// Is [`CacheMode::Default`] by default.
126    pub cache: CacheMode,
127
128    /// If cookies should be send and stored.
129    ///
130    /// When enabled the [`http_cache`] is used to retrieve and store cookies.
131    ///
132    /// Is not enabled by default.
133    #[cfg(feature = "http_cookie")]
134    pub cookies: bool,
135
136    /// If transfer metrics should be measured.
137    ///
138    /// When enabled you can get the information using the [`Response::metrics`] method.
139    ///
140    /// This is enabled by default.
141    pub metrics: bool,
142
143    /// Request body content.
144    ///
145    /// Is empty by default.
146    pub body: IpcBytes,
147}
148impl Request {
149    /// Starts building a request.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use zng_task::http;
155    ///
156    /// # fn try_example() -> Result<(), http::Error> {
157    /// let request = http::Request::new(http::Method::PUT, "https://httpbin.org/put".try_into()?);
158    /// # Ok(()) }
159    /// ```
160    pub fn new(method: Method, uri: Uri) -> Self {
161        match REQUEST_DEFAULT.lock().clone() {
162            Some(mut r) => {
163                r.method = method;
164                r.uri = uri;
165                r
166            }
167            None => Self {
168                uri,
169                method,
170                require_length: false,
171                max_length: ByteLength::MAX,
172                headers: header::HeaderMap::new(),
173                timeout: Duration::MAX,
174                connect_timeout: 90.secs(),
175                low_speed_timeout: (Duration::MAX, 0.bytes()),
176                redirect_limit: 20,
177                #[cfg(feature = "http_compression")]
178                auto_decompress: true,
179                max_upload_speed: ByteLength::MAX,
180                max_download_speed: ByteLength::MAX,
181                cache: CacheMode::Default,
182                #[cfg(feature = "http_cookie")]
183                cookies: false,
184                metrics: true,
185                body: IpcBytes::default(),
186            },
187        }
188    }
189
190    /// Starts building a GET request.
191    ///
192    /// # Examples
193    ///
194    /// ```
195    /// use zng_task::http;
196    ///
197    /// # fn try_example() -> Result<(), http::Error> {
198    /// let get = http::Request::get("https://httpbin.org/get")?;
199    /// # Ok(()) }
200    /// ```
201    pub fn get<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
202        Ok(Self::new(Method::GET, uri.try_into()?))
203    }
204
205    /// Starts building a PUT request.
206    ///
207    /// # Examples
208    ///
209    /// ```
210    /// use zng_task::http;
211    ///
212    /// # fn try_example() -> Result<(), http::Error> {
213    /// let put = http::Request::put("https://httpbin.org/put")?.header("accept", "application/json")?;
214    /// # Ok(()) }
215    /// ```
216    pub fn put<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
217        Ok(Self::new(Method::PUT, uri.try_into()?))
218    }
219
220    /// Starts building a POST request.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use zng_task::http;
226    ///
227    /// # fn try_example() -> Result<(), http::Error> {
228    /// let post = http::Request::post("https://httpbin.org/post")?.header("accept", "application/json")?;
229    /// # Ok(()) }
230    /// ```
231    pub fn post<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
232        Ok(Self::new(Method::POST, uri.try_into()?))
233    }
234
235    /// Starts building a DELETE request.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use zng_task::http;
241    ///
242    /// # fn try_example() -> Result<(), http::Error> {
243    /// let delete = http::Request::delete("https://httpbin.org/delete")?.header("accept", "application/json")?;
244    /// # Ok(()) }
245    /// ```
246    pub fn delete<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
247        Ok(Self::new(Method::DELETE, uri.try_into()?))
248    }
249
250    /// Starts building a PATCH request.
251    ///
252    /// # Examples
253    ///
254    /// ```
255    /// use zng_task::http;
256    ///
257    /// # fn try_example() -> Result<(), http::Error> {
258    /// let patch = http::Request::patch("https://httpbin.org/patch")?.header("accept", "application/json")?;
259    /// # Ok(()) }
260    /// ```
261    pub fn patch<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
262        Ok(Self::new(Method::PATCH, uri.try_into()?))
263    }
264
265    /// Starts building a HEAD request.
266    ///
267    /// # Examples
268    ///
269    /// ```
270    /// use zng_task::http;
271    ///
272    /// # fn try_example() -> Result<(), http::Error> {
273    /// let head = http::Request::head("https://httpbin.org")?;
274    /// # Ok(()) }
275    /// ```
276    pub fn head<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
277        Ok(Self::new(Method::HEAD, uri.try_into()?))
278    }
279
280    /// Appends a header to [`headers`] to this request.
281    ///
282    /// [`headers`]: field@Request::headers
283    pub fn header<K, V>(mut self, name: K, value: V) -> Result<Self, Error>
284    where
285        K: TryInto<header::HeaderName>,
286        V: TryInto<header::HeaderValue>,
287        Error: From<<K as TryInto<header::HeaderName>>::Error>,
288        Error: From<<V as TryInto<header::HeaderValue>>::Error>,
289    {
290        self.headers.insert(name.try_into()?, value.try_into()?);
291        Ok(self)
292    }
293
294    /// Set the [`timeout`].
295    ///
296    /// [`timeout`]: field@Request::timeout
297    pub fn timeout(mut self, timeout: Duration) -> Self {
298        self.timeout = timeout;
299        self
300    }
301
302    /// Set the [`connect_timeout`].
303    ///
304    /// [`connect_timeout`]: field@Request::connect_timeout
305    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
306        self.connect_timeout = timeout;
307        self
308    }
309
310    /// Set the [`low_speed_timeout`].
311    ///
312    /// [`low_speed_timeout`]: field@Request::low_speed_timeout
313    pub fn low_speed_timeout(mut self, timeout: Duration, bytes_per_sec: ByteLength) -> Self {
314        self.low_speed_timeout = (timeout, bytes_per_sec);
315        self
316    }
317
318    /// Set the [`redirect_limit`].
319    ///
320    /// [`redirect_limit`]: field@Request::redirect_limit
321    pub fn redirect_limit(mut self, count: u16) -> Self {
322        self.redirect_limit = count;
323        self
324    }
325
326    /// Set the [`auto_decompress`].
327    ///
328    /// [`auto_decompress`]: field@Request::auto_decompress
329    #[cfg(feature = "http_compression")]
330    pub fn auto_decompress(mut self, enabled: bool) -> Self {
331        self.auto_decompress = enabled;
332        self
333    }
334
335    /// Set [`require_length`].
336    ///
337    /// [`require_length`]: field@Request::require_length
338    pub fn require_length(mut self, enabled: bool) -> Self {
339        self.require_length = enabled;
340        self
341    }
342
343    /// Set [`max_length`].
344    ///
345    /// [`max_length`]: field@Request::max_length
346    pub fn max_length(mut self, max: ByteLength) -> Self {
347        self.max_length = max;
348        self
349    }
350
351    /// Set the [`max_upload_speed`].
352    ///
353    /// [`max_upload_speed`]: field@Request::max_upload_speed
354    pub fn max_upload_speed(mut self, bytes_per_sec: ByteLength) -> Self {
355        self.max_upload_speed = bytes_per_sec;
356        self
357    }
358
359    /// Set the [`max_download_speed`].
360    ///
361    /// [`max_download_speed`]: field@Request::max_download_speed
362    pub fn max_download_speed(mut self, bytes_per_sec: ByteLength) -> Self {
363        self.max_download_speed = bytes_per_sec;
364        self
365    }
366
367    /// Set the [`cookies`].
368    ///
369    /// [`cookies`]: field@Request::cookies
370    #[cfg(feature = "http_compression")]
371    pub fn cookies(mut self, enable: bool) -> Self {
372        self.cookies = enable;
373        self
374    }
375
376    /// Set the [`metrics`].
377    ///
378    /// [`metrics`]: field@Request::metrics
379    pub fn metrics(mut self, enabled: bool) -> Self {
380        self.metrics = enabled;
381        self
382    }
383
384    /// Set the [`body`].
385    ///
386    /// [`body`]: field@Request::body
387    pub fn body(mut self, body: IpcBytes) -> Self {
388        self.body = body;
389        self
390    }
391
392    /// Set the [`body`] to a plain text UTF-8 payload.  Also sets the `Content-Type` header if it is not set.
393    ///
394    /// [`body`]: field@Request::body
395    pub fn body_text(mut self, body: &str) -> Result<Self, Error> {
396        if !self.headers.contains_key("Content-Type") {
397            self = self.header("Content-Type", "text/plain; charset=utf-8")?;
398        }
399        Ok(self.body(IpcBytes::from_slice_blocking(body.as_bytes())?))
400    }
401
402    /// Set the [`body`] to a JSON payload. Also sets the `Content-Type` header if it is not set.
403    ///
404    /// [`body`]: field@Request::body
405    pub fn body_json<T: Serialize>(mut self, body: &T) -> Result<Self, Error> {
406        if !self.headers.contains_key("Content-Type") {
407            self = self.header("Content-Type", "text/json; charset=utf-8")?;
408        }
409        let body = serde_json::to_vec(body)?;
410        Ok(self.body(IpcBytes::from_vec_blocking(body)?))
411    }
412}
413impl From<Request> for http::Request<IpcBytes> {
414    fn from(mut r: Request) -> Self {
415        let mut b = http::Request::builder().uri(mem::take(&mut r.uri)).method(r.method.clone());
416        if !r.headers.is_empty() {
417            *b.headers_mut().unwrap() = mem::take(&mut r.headers);
418        }
419        let body = mem::take(&mut r.body);
420        let b = b.extension(r);
421        b.body(body).unwrap()
422    }
423}
424impl From<http::Request<IpcBytes>> for Request {
425    fn from(value: http::Request<IpcBytes>) -> Self {
426        let (mut parts, body) = value.into_parts();
427        if let Some(mut r) = parts.extensions.remove::<Request>() {
428            r.method = parts.method;
429            r.uri = parts.uri;
430            r.headers = parts.headers;
431            r.body = body;
432            r
433        } else {
434            let mut r = Request::new(parts.method, parts.uri);
435            r.headers = parts.headers;
436            r.body = body;
437            r
438        }
439    }
440}
441
442/// HTTP response.
443pub struct Response {
444    status: StatusCode,
445    headers: header::HeaderMap,
446    effective_uri: Uri,
447    body: ResponseBody,
448    metrics: Var<Metrics>,
449}
450impl fmt::Debug for Response {
451    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452        f.debug_struct("Response")
453            .field("status", &self.status)
454            .field("effective_uri", &self.effective_uri)
455            .field("header", &self.headers)
456            .field("metrics", &self.metrics.get())
457            .finish_non_exhaustive()
458    }
459}
460enum ResponseBody {
461    Done { bytes: IpcBytes },
462    Read { read: Box<dyn AsyncRead + Send> },
463}
464impl Response {
465    /// New with body download pending or ongoing.
466    pub fn from_read(
467        status: StatusCode,
468        header: header::HeaderMap,
469        effective_uri: Uri,
470        metrics: Var<Metrics>,
471        read: Box<dyn AsyncRead + Send>,
472    ) -> Self {
473        Self {
474            status,
475            headers: header,
476            effective_uri,
477            metrics,
478            body: ResponseBody::Read { read },
479        }
480    }
481
482    /// New with body already downloaded.
483    pub fn from_done(status: StatusCode, mut headers: header::HeaderMap, effective_uri: Uri, metrics: Metrics, body: IpcBytes) -> Self {
484        if !headers.contains_key(header::CONTENT_LENGTH) {
485            headers.insert(header::CONTENT_LENGTH, body.len().into());
486        }
487        Self {
488            status,
489            headers,
490            effective_uri,
491            metrics: const_var(metrics),
492            body: ResponseBody::Done { bytes: body },
493        }
494    }
495
496    /// New with status and message body.
497    pub fn from_msg(status: StatusCode, msg: impl ToTxt) -> Self {
498        Self::from_done(
499            status,
500            header::HeaderMap::new(),
501            Uri::from_static("/"),
502            Metrics::zero(),
503            IpcBytes::from_slice_blocking(msg.to_txt().as_bytes()).unwrap(),
504        )
505    }
506
507    /// Returns the [`StatusCode`].
508    pub fn status(&self) -> StatusCode {
509        self.status
510    }
511
512    /// Returns a reference to the associated header field map.
513    pub fn header(&self) -> &header::HeaderMap {
514        &self.headers
515    }
516
517    /// Get the effective URI of this response. This value differs from the
518    /// original URI provided when making the request if at least one redirect
519    /// was followed.
520    pub fn effective_uri(&self) -> &Uri {
521        &self.effective_uri
522    }
523
524    /// Get the body bytes length if it is downloaded or `Content-Length` value if it is present in the headers.
525    pub fn content_len(&self) -> Option<ByteLength> {
526        match &self.body {
527            ResponseBody::Done { bytes, .. } => Some(bytes.len().bytes()),
528            ResponseBody::Read { .. } => {
529                let len = self
530                    .headers
531                    .get(header::CONTENT_LENGTH)?
532                    .to_str()
533                    .ok()?
534                    .parse::<usize>()
535                    .ok()?
536                    .bytes();
537                Some(len)
538            }
539        }
540    }
541
542    /// Receive the entire body.
543    pub async fn download(&mut self) -> Result<(), Error> {
544        if let ResponseBody::Done { .. } = &self.body {
545            return Ok(());
546        }
547
548        let downloader = match mem::replace(
549            &mut self.body,
550            ResponseBody::Done {
551                bytes: IpcBytes::default(),
552            },
553        ) {
554            ResponseBody::Read { read: downloader } => downloader,
555            ResponseBody::Done { .. } => unreachable!(),
556        };
557        let mut downloader = Box::into_pin(downloader);
558        let body = IpcBytes::from_read(downloader.as_mut()).await?;
559
560        self.body = ResponseBody::Done { bytes: body };
561
562        Ok(())
563    }
564
565    /// Download the full body and returns a reference to it.
566    pub async fn body(&mut self) -> Result<IpcBytes, Error> {
567        self.download().await?;
568        match &self.body {
569            ResponseBody::Done { bytes, .. } => Ok(bytes.clone()),
570            ResponseBody::Read { .. } => unreachable!(),
571        }
572    }
573
574    /// Download the full body and returns it decoded to text.
575    pub async fn body_text(&mut self) -> Result<Txt, Error> {
576        let content_type = self
577            .headers
578            .get(header::CONTENT_TYPE)
579            .and_then(|value| value.to_str().ok())
580            .and_then(|value| value.parse::<mime::Mime>().ok());
581        let encoding_name = content_type
582            .as_ref()
583            .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
584            .unwrap_or("utf-8");
585
586        let bytes = self.body().await?;
587
588        let encoding = encoding_rs::Encoding::for_label(encoding_name.as_bytes()).unwrap_or(encoding_rs::UTF_8);
589        let (text, _, _) = encoding.decode(&bytes);
590        Ok(Txt::from_str(&text))
591    }
592
593    /// Download the full body and returns it decoded to JSON and deserialized to `O`.
594    pub async fn body_json<O>(&mut self) -> Result<O, Error>
595    where
596        O: serde::de::DeserializeOwned + std::marker::Unpin,
597    {
598        let bytes = self.body().await?;
599        let r = serde_json::from_slice(&bytes)?;
600        Ok(r)
601    }
602
603    /// Metrics for the task transfer, if it was enabled in the request.
604    pub fn metrics(&self) -> Var<Metrics> {
605        self.metrics.read_only()
606    }
607}
608
609/// Send a GET request to the `uri`.
610///
611/// The [`http_client`] is used to send the request.
612pub async fn get<U>(uri: U) -> Result<Response, Error>
613where
614    U: TryInto<Uri>,
615    Error: From<<U as TryInto<Uri>>::Error>,
616{
617    send(Request::get(uri)?).await
618}
619
620/// Send a GET request to the `uri` and read the response as a string.
621///
622/// The [`http_client`] is used to send the request.
623pub async fn get_txt<U>(uri: U) -> Result<Txt, Error>
624where
625    U: TryInto<Uri>,
626    Error: From<<U as TryInto<Uri>>::Error>,
627{
628    send(Request::get(uri)?).await?.body_text().await
629}
630
631/// Send a GET request to the `uri` and read the response as raw bytes.
632///
633/// The [`http_client`] is used to send the request.
634pub async fn get_bytes<U>(uri: U) -> Result<IpcBytes, Error>
635where
636    U: TryInto<Uri>,
637    Error: From<<U as TryInto<Uri>>::Error>,
638{
639    send(Request::get(uri)?).await?.body().await
640}
641
642/// Send a GET request to the `uri` and de-serializes the response.
643///
644/// The [`http_client`] is used to send the request.
645pub async fn get_json<U, O>(uri: U) -> Result<O, Error>
646where
647    U: TryInto<Uri>,
648    Error: From<<U as TryInto<Uri>>::Error>,
649    O: serde::de::DeserializeOwned + std::marker::Unpin,
650{
651    send(Request::get(uri)?).await?.body_json().await
652}
653
654/// Send a HEAD request to the `uri`.
655///
656/// The [`http_client`] is used to send the request.
657pub async fn head<U>(uri: U) -> Result<Response, Error>
658where
659    U: TryInto<Uri>,
660    Error: From<<U as TryInto<Uri>>::Error>,
661{
662    send(Request::head(uri)?).await
663}
664
665/// Send a PUT request to the `uri` with a given request body.
666///
667/// The [`http_client`] is used to send the request.
668pub async fn put<U>(uri: U, body: IpcBytes) -> Result<Response, Error>
669where
670    U: TryInto<Uri>,
671    Error: From<<U as TryInto<Uri>>::Error>,
672{
673    send(Request::put(uri)?.body(body)).await
674}
675
676/// Send a POST request to the `uri` with a given request body.
677///
678/// The [`http_client`] is used to send the request.
679pub async fn post<U>(uri: U, body: IpcBytes) -> Result<Response, Error>
680where
681    U: TryInto<Uri>,
682    Error: From<<U as TryInto<Uri>>::Error>,
683{
684    send(Request::post(uri)?.body(body)).await
685}
686
687/// Send a DELETE request to the `uri`.
688///
689/// The [`http_client`] is used to send the request.
690pub async fn delete<U>(uri: U) -> Result<Response, Error>
691where
692    U: TryInto<Uri>,
693    Error: From<<U as TryInto<Uri>>::Error>,
694{
695    send(Request::delete(uri)?).await
696}
697
698/// Send a custom [`Request`].
699///
700/// The [`http_client`] is used to send the request.
701pub async fn send(request: Request) -> Result<Response, Error> {
702    let client = http_client();
703    if client.is_cache_manager() {
704        client.send(request).await
705    } else {
706        match request.cache {
707            CacheMode::NoCache => client.send(request).await,
708            CacheMode::Default => cache::send_cache(client, request).await,
709            CacheMode::Permanent => cache::send_cache_perm(client, request).await,
710        }
711    }
712}