1#![cfg(feature = "http")]
2
3mod cache;
10mod ctx;
11mod curl;
12mod file_cache;
13mod util;
14
15pub use cache::{CacheKey, CacheMode, CachePolicy};
16pub use ctx::{HttpCache, HttpClient, http_cache, http_client, set_http_cache, set_http_client, set_request_default};
17pub use curl::CurlProcessClient;
18pub use file_cache::FileSystemCache;
19
20pub type Error = Box<dyn std::error::Error + Send + Sync>;
22
23pub use http::{
24 StatusCode, header,
25 method::{self, Method},
26 uri::{self, Uri},
27};
28use serde::{Deserialize, Serialize};
29use zng_var::{Var, const_var};
30
31use std::time::Duration;
32use std::{fmt, mem};
33
34use crate::{channel::IpcBytes, http::ctx::REQUEST_DEFAULT, io::Metrics};
35
36use super::io::AsyncRead;
37
38use zng_txt::{ToTxt, Txt};
39use zng_unit::*;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
45#[non_exhaustive]
46pub struct Request {
47 #[serde(with = "http_serde::uri")]
49 pub uri: Uri,
50 #[serde(with = "http_serde::method")]
52 pub method: Method,
53
54 #[serde(with = "http_serde::header_map")]
58 pub headers: http::HeaderMap,
59
60 pub timeout: Duration,
71
72 pub connect_timeout: Duration,
76
77 pub low_speed_timeout: (Duration, ByteLength),
81
82 pub redirect_limit: u16,
88
89 #[cfg(feature = "http_compression")]
95 pub auto_decompress: bool,
96
97 pub max_upload_speed: ByteLength,
101
102 pub max_download_speed: ByteLength,
106
107 pub require_length: bool,
111
112 pub max_length: ByteLength,
122
123 pub cache: CacheMode,
127
128 #[cfg(feature = "http_cookie")]
134 pub cookies: bool,
135
136 pub metrics: bool,
142
143 pub body: IpcBytes,
147}
148impl Request {
149 pub fn new(method: Method, uri: Uri) -> Self {
161 match REQUEST_DEFAULT.lock().clone() {
162 Some(mut r) => {
163 r.method = method;
164 r.uri = uri;
165 r
166 }
167 None => Self {
168 uri,
169 method,
170 require_length: false,
171 max_length: ByteLength::MAX,
172 headers: header::HeaderMap::new(),
173 timeout: Duration::MAX,
174 connect_timeout: 90.secs(),
175 low_speed_timeout: (Duration::MAX, 0.bytes()),
176 redirect_limit: 20,
177 #[cfg(feature = "http_compression")]
178 auto_decompress: true,
179 max_upload_speed: ByteLength::MAX,
180 max_download_speed: ByteLength::MAX,
181 cache: CacheMode::Default,
182 #[cfg(feature = "http_cookie")]
183 cookies: false,
184 metrics: true,
185 body: IpcBytes::default(),
186 },
187 }
188 }
189
190 pub fn get<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
202 Ok(Self::new(Method::GET, uri.try_into()?))
203 }
204
205 pub fn put<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
217 Ok(Self::new(Method::PUT, uri.try_into()?))
218 }
219
220 pub fn post<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
232 Ok(Self::new(Method::POST, uri.try_into()?))
233 }
234
235 pub fn delete<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
247 Ok(Self::new(Method::DELETE, uri.try_into()?))
248 }
249
250 pub fn patch<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
262 Ok(Self::new(Method::PATCH, uri.try_into()?))
263 }
264
265 pub fn head<U: TryInto<Uri>>(uri: U) -> Result<Self, <U as TryInto<Uri>>::Error> {
277 Ok(Self::new(Method::HEAD, uri.try_into()?))
278 }
279
280 pub fn header<K, V>(mut self, name: K, value: V) -> Result<Self, Error>
284 where
285 K: TryInto<header::HeaderName>,
286 V: TryInto<header::HeaderValue>,
287 Error: From<<K as TryInto<header::HeaderName>>::Error>,
288 Error: From<<V as TryInto<header::HeaderValue>>::Error>,
289 {
290 self.headers.insert(name.try_into()?, value.try_into()?);
291 Ok(self)
292 }
293
294 pub fn timeout(mut self, timeout: Duration) -> Self {
298 self.timeout = timeout;
299 self
300 }
301
302 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
306 self.connect_timeout = timeout;
307 self
308 }
309
310 pub fn low_speed_timeout(mut self, timeout: Duration, bytes_per_sec: ByteLength) -> Self {
314 self.low_speed_timeout = (timeout, bytes_per_sec);
315 self
316 }
317
318 pub fn redirect_limit(mut self, count: u16) -> Self {
322 self.redirect_limit = count;
323 self
324 }
325
326 #[cfg(feature = "http_compression")]
330 pub fn auto_decompress(mut self, enabled: bool) -> Self {
331 self.auto_decompress = enabled;
332 self
333 }
334
335 pub fn require_length(mut self, enabled: bool) -> Self {
339 self.require_length = enabled;
340 self
341 }
342
343 pub fn max_length(mut self, max: ByteLength) -> Self {
347 self.max_length = max;
348 self
349 }
350
351 pub fn max_upload_speed(mut self, bytes_per_sec: ByteLength) -> Self {
355 self.max_upload_speed = bytes_per_sec;
356 self
357 }
358
359 pub fn max_download_speed(mut self, bytes_per_sec: ByteLength) -> Self {
363 self.max_download_speed = bytes_per_sec;
364 self
365 }
366
367 #[cfg(feature = "http_compression")]
371 pub fn cookies(mut self, enable: bool) -> Self {
372 self.cookies = enable;
373 self
374 }
375
376 pub fn metrics(mut self, enabled: bool) -> Self {
380 self.metrics = enabled;
381 self
382 }
383
384 pub fn body(mut self, body: IpcBytes) -> Self {
388 self.body = body;
389 self
390 }
391
392 pub fn body_text(mut self, body: &str) -> Result<Self, Error> {
396 if !self.headers.contains_key("Content-Type") {
397 self = self.header("Content-Type", "text/plain; charset=utf-8")?;
398 }
399 Ok(self.body(IpcBytes::from_slice_blocking(body.as_bytes())?))
400 }
401
402 pub fn body_json<T: Serialize>(mut self, body: &T) -> Result<Self, Error> {
406 if !self.headers.contains_key("Content-Type") {
407 self = self.header("Content-Type", "text/json; charset=utf-8")?;
408 }
409 let body = serde_json::to_vec(body)?;
410 Ok(self.body(IpcBytes::from_vec_blocking(body)?))
411 }
412}
413impl From<Request> for http::Request<IpcBytes> {
414 fn from(mut r: Request) -> Self {
415 let mut b = http::Request::builder().uri(mem::take(&mut r.uri)).method(r.method.clone());
416 if !r.headers.is_empty() {
417 *b.headers_mut().unwrap() = mem::take(&mut r.headers);
418 }
419 let body = mem::take(&mut r.body);
420 let b = b.extension(r);
421 b.body(body).unwrap()
422 }
423}
424impl From<http::Request<IpcBytes>> for Request {
425 fn from(value: http::Request<IpcBytes>) -> Self {
426 let (mut parts, body) = value.into_parts();
427 if let Some(mut r) = parts.extensions.remove::<Request>() {
428 r.method = parts.method;
429 r.uri = parts.uri;
430 r.headers = parts.headers;
431 r.body = body;
432 r
433 } else {
434 let mut r = Request::new(parts.method, parts.uri);
435 r.headers = parts.headers;
436 r.body = body;
437 r
438 }
439 }
440}
441
442pub struct Response {
444 status: StatusCode,
445 headers: header::HeaderMap,
446 effective_uri: Uri,
447 body: ResponseBody,
448 metrics: Var<Metrics>,
449}
450impl fmt::Debug for Response {
451 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452 f.debug_struct("Response")
453 .field("status", &self.status)
454 .field("effective_uri", &self.effective_uri)
455 .field("header", &self.headers)
456 .field("metrics", &self.metrics.get())
457 .finish_non_exhaustive()
458 }
459}
460enum ResponseBody {
461 Done { bytes: IpcBytes },
462 Read { read: Box<dyn AsyncRead + Send> },
463}
464impl Response {
465 pub fn from_read(
467 status: StatusCode,
468 header: header::HeaderMap,
469 effective_uri: Uri,
470 metrics: Var<Metrics>,
471 read: Box<dyn AsyncRead + Send>,
472 ) -> Self {
473 Self {
474 status,
475 headers: header,
476 effective_uri,
477 metrics,
478 body: ResponseBody::Read { read },
479 }
480 }
481
482 pub fn from_done(status: StatusCode, mut headers: header::HeaderMap, effective_uri: Uri, metrics: Metrics, body: IpcBytes) -> Self {
484 if !headers.contains_key(header::CONTENT_LENGTH) {
485 headers.insert(header::CONTENT_LENGTH, body.len().into());
486 }
487 Self {
488 status,
489 headers,
490 effective_uri,
491 metrics: const_var(metrics),
492 body: ResponseBody::Done { bytes: body },
493 }
494 }
495
496 pub fn from_msg(status: StatusCode, msg: impl ToTxt) -> Self {
498 Self::from_done(
499 status,
500 header::HeaderMap::new(),
501 Uri::from_static("/"),
502 Metrics::zero(),
503 IpcBytes::from_slice_blocking(msg.to_txt().as_bytes()).unwrap(),
504 )
505 }
506
507 pub fn status(&self) -> StatusCode {
509 self.status
510 }
511
512 pub fn header(&self) -> &header::HeaderMap {
514 &self.headers
515 }
516
517 pub fn effective_uri(&self) -> &Uri {
521 &self.effective_uri
522 }
523
524 pub fn content_len(&self) -> Option<ByteLength> {
526 match &self.body {
527 ResponseBody::Done { bytes, .. } => Some(bytes.len().bytes()),
528 ResponseBody::Read { .. } => {
529 let len = self
530 .headers
531 .get(header::CONTENT_LENGTH)?
532 .to_str()
533 .ok()?
534 .parse::<usize>()
535 .ok()?
536 .bytes();
537 Some(len)
538 }
539 }
540 }
541
542 pub async fn download(&mut self) -> Result<(), Error> {
544 if let ResponseBody::Done { .. } = &self.body {
545 return Ok(());
546 }
547
548 let downloader = match mem::replace(
549 &mut self.body,
550 ResponseBody::Done {
551 bytes: IpcBytes::default(),
552 },
553 ) {
554 ResponseBody::Read { read: downloader } => downloader,
555 ResponseBody::Done { .. } => unreachable!(),
556 };
557 let mut downloader = Box::into_pin(downloader);
558 let body = IpcBytes::from_read(downloader.as_mut()).await?;
559
560 self.body = ResponseBody::Done { bytes: body };
561
562 Ok(())
563 }
564
565 pub async fn body(&mut self) -> Result<IpcBytes, Error> {
567 self.download().await?;
568 match &self.body {
569 ResponseBody::Done { bytes, .. } => Ok(bytes.clone()),
570 ResponseBody::Read { .. } => unreachable!(),
571 }
572 }
573
574 pub async fn body_text(&mut self) -> Result<Txt, Error> {
576 let content_type = self
577 .headers
578 .get(header::CONTENT_TYPE)
579 .and_then(|value| value.to_str().ok())
580 .and_then(|value| value.parse::<mime::Mime>().ok());
581 let encoding_name = content_type
582 .as_ref()
583 .and_then(|mime| mime.get_param("charset").map(|charset| charset.as_str()))
584 .unwrap_or("utf-8");
585
586 let bytes = self.body().await?;
587
588 let encoding = encoding_rs::Encoding::for_label(encoding_name.as_bytes()).unwrap_or(encoding_rs::UTF_8);
589 let (text, _, _) = encoding.decode(&bytes);
590 Ok(Txt::from_str(&text))
591 }
592
593 pub async fn body_json<O>(&mut self) -> Result<O, Error>
595 where
596 O: serde::de::DeserializeOwned + std::marker::Unpin,
597 {
598 let bytes = self.body().await?;
599 let r = serde_json::from_slice(&bytes)?;
600 Ok(r)
601 }
602
603 pub fn metrics(&self) -> Var<Metrics> {
605 self.metrics.read_only()
606 }
607}
608
609pub async fn get<U>(uri: U) -> Result<Response, Error>
613where
614 U: TryInto<Uri>,
615 Error: From<<U as TryInto<Uri>>::Error>,
616{
617 send(Request::get(uri)?).await
618}
619
620pub async fn get_txt<U>(uri: U) -> Result<Txt, Error>
624where
625 U: TryInto<Uri>,
626 Error: From<<U as TryInto<Uri>>::Error>,
627{
628 send(Request::get(uri)?).await?.body_text().await
629}
630
631pub async fn get_bytes<U>(uri: U) -> Result<IpcBytes, Error>
635where
636 U: TryInto<Uri>,
637 Error: From<<U as TryInto<Uri>>::Error>,
638{
639 send(Request::get(uri)?).await?.body().await
640}
641
642pub async fn get_json<U, O>(uri: U) -> Result<O, Error>
646where
647 U: TryInto<Uri>,
648 Error: From<<U as TryInto<Uri>>::Error>,
649 O: serde::de::DeserializeOwned + std::marker::Unpin,
650{
651 send(Request::get(uri)?).await?.body_json().await
652}
653
654pub async fn head<U>(uri: U) -> Result<Response, Error>
658where
659 U: TryInto<Uri>,
660 Error: From<<U as TryInto<Uri>>::Error>,
661{
662 send(Request::head(uri)?).await
663}
664
665pub async fn put<U>(uri: U, body: IpcBytes) -> Result<Response, Error>
669where
670 U: TryInto<Uri>,
671 Error: From<<U as TryInto<Uri>>::Error>,
672{
673 send(Request::put(uri)?.body(body)).await
674}
675
676pub async fn post<U>(uri: U, body: IpcBytes) -> Result<Response, Error>
680where
681 U: TryInto<Uri>,
682 Error: From<<U as TryInto<Uri>>::Error>,
683{
684 send(Request::post(uri)?.body(body)).await
685}
686
687pub async fn delete<U>(uri: U) -> Result<Response, Error>
691where
692 U: TryInto<Uri>,
693 Error: From<<U as TryInto<Uri>>::Error>,
694{
695 send(Request::delete(uri)?).await
696}
697
698pub async fn send(request: Request) -> Result<Response, Error> {
702 let client = http_client();
703 if client.is_cache_manager() {
704 client.send(request).await
705 } else {
706 match request.cache {
707 CacheMode::NoCache => client.send(request).await,
708 CacheMode::Default => cache::send_cache(client, request).await,
709 CacheMode::Permanent => cache::send_cache_perm(client, request).await,
710 }
711 }
712}