1#![cfg(feature = "http")]
2#![expect(clippy::type_complexity)]
4
5mod cache;
25mod util;
26
27pub use cache::*;
28use zng_var::impl_from_and_into_var;
29
30use std::convert::TryFrom;
31use std::error::Error as StdError;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::time::Duration;
35use std::{fmt, mem};
36
37use crate::Progress;
38
39use super::io::AsyncRead;
40
41use isahc::config::Configurable;
42pub use isahc::config::RedirectPolicy;
43pub use isahc::cookies::{Cookie, CookieJar};
44pub use isahc::http::{Method, StatusCode, Uri, header, uri};
45
46use futures_lite::io::{AsyncReadExt, BufReader};
47use isahc::{AsyncReadResponseExt, ResponseExt};
48use parking_lot::{Mutex, const_mutex};
49
50use zng_txt::{Txt, formatx};
51use zng_unit::*;
52
53#[diagnostic::on_unimplemented(note = "`TryUri` is implemented for all `T` where `Uri: TryFrom<T, Error: Into<isahc::http::Error>>`")]
57pub trait TryUri {
58 fn try_uri(self) -> Result<Uri, Error>;
60}
61impl<U> TryUri for U
62where
63 Uri: TryFrom<U>,
64 <Uri as TryFrom<U>>::Error: Into<isahc::http::Error>,
65{
66 fn try_uri(self) -> Result<Uri, Error> {
67 Uri::try_from(self).map_err(|e| e.into().into())
68 }
69}
70
71#[diagnostic::on_unimplemented(note = "`TryMethod` is implemented for all `T` where `Method: TryFrom<T, Error: Into<isahc::http::Error>>`")]
75pub trait TryMethod {
76 fn try_method(self) -> Result<Method, Error>;
78}
79impl<U> TryMethod for U
80where
81 Method: TryFrom<U>,
82 <isahc::http::Method as TryFrom<U>>::Error: Into<isahc::http::Error>,
83{
84 fn try_method(self) -> Result<Method, Error> {
85 Method::try_from(self).map_err(|e| e.into().into())
86 }
87}
88
89#[diagnostic::on_unimplemented(note = "`TryBody` is implemented for all `T` where `Body: TryFrom<T, Error: Into<isahc::http::Error>>`")]
94pub trait TryBody {
95 fn try_body(self) -> Result<Body, Error>;
97}
98impl<U> TryBody for U
99where
100 isahc::AsyncBody: TryFrom<U>,
101 <isahc::AsyncBody as TryFrom<U>>::Error: Into<isahc::http::Error>,
102{
103 fn try_body(self) -> Result<Body, Error> {
104 match isahc::AsyncBody::try_from(self) {
105 Ok(r) => Ok(Body(r)),
106 Err(e) => Err(e.into().into()),
107 }
108 }
109}
110
111#[diagnostic::on_unimplemented(
116 note = "`TryHeaderName` is implemented for all `T` where `HeaderName: TryFrom<T, Error: Into<isahc::http::Error>>`"
117)]
118pub trait TryHeaderName {
119 fn try_header_name(self) -> Result<header::HeaderName, Error>;
121}
122impl<U> TryHeaderName for U
123where
124 header::HeaderName: TryFrom<U>,
125 <header::HeaderName as TryFrom<U>>::Error: Into<isahc::http::Error>,
126{
127 fn try_header_name(self) -> Result<header::HeaderName, Error> {
128 header::HeaderName::try_from(self).map_err(|e| e.into().into())
129 }
130}
131
132#[diagnostic::on_unimplemented(
137 note = "`TryHeaderValue` is implemented for all `T` where `HeaderValue: TryFrom<T, Error: Into<isahc::http::Error>>`"
138)]
139pub trait TryHeaderValue {
140 fn try_header_value(self) -> Result<header::HeaderValue, Error>;
142}
143impl<U> TryHeaderValue for U
144where
145 header::HeaderValue: TryFrom<U>,
146 <header::HeaderValue as TryFrom<U>>::Error: Into<isahc::http::Error>,
147{
148 fn try_header_value(self) -> Result<header::HeaderValue, Error> {
149 header::HeaderValue::try_from(self).map_err(|e| e.into().into())
150 }
151}
152
153#[derive(Debug)]
157pub struct Request {
158 req: isahc::Request<Body>,
159 limits: ResponseLimits,
160}
161impl Request {
162 pub fn builder() -> RequestBuilder {
180 RequestBuilder::start(isahc::Request::builder())
181 }
182
183 pub fn get(uri: impl TryUri) -> Result<RequestBuilder, Error> {
195 Ok(RequestBuilder::start(isahc::Request::get(uri.try_uri()?)))
196 }
197
198 pub fn put(uri: impl TryUri) -> Result<RequestBuilder, Error> {
210 Ok(RequestBuilder::start(isahc::Request::put(uri.try_uri()?)))
211 }
212
213 pub fn post(uri: impl TryUri) -> Result<RequestBuilder, Error> {
225 Ok(RequestBuilder::start(isahc::Request::post(uri.try_uri()?)))
226 }
227
228 pub fn delete(uri: impl TryUri) -> Result<RequestBuilder, Error> {
240 Ok(RequestBuilder::start(isahc::Request::delete(uri.try_uri()?)))
241 }
242
243 pub fn patch(uri: impl TryUri) -> Result<RequestBuilder, Error> {
255 Ok(RequestBuilder::start(isahc::Request::patch(uri.try_uri()?)))
256 }
257
258 pub fn head(uri: impl TryUri) -> Result<RequestBuilder, Error> {
270 Ok(RequestBuilder::start(isahc::Request::head(uri.try_uri()?)))
271 }
272
273 pub fn uri(&self) -> &Uri {
275 self.req.uri()
276 }
277
278 pub fn method(&self) -> &Method {
280 self.req.method()
281 }
282
283 pub fn headers(&self) -> &header::HeaderMap {
285 self.req.headers()
286 }
287
288 pub fn clone_with(&self, body: impl TryBody) -> Result<Self, Error> {
290 let body = body.try_body()?;
291
292 let mut req = isahc::Request::new(body);
293 *req.method_mut() = self.req.method().clone();
294 *req.uri_mut() = self.req.uri().clone();
295 *req.version_mut() = self.req.version();
296 let headers = req.headers_mut();
297 for (name, value) in self.headers() {
298 headers.insert(name.clone(), value.clone());
299 }
300
301 Ok(Self {
302 req,
303 limits: self.limits.clone(),
304 })
305 }
306}
307
308#[derive(Debug, Default, Clone)]
309struct ResponseLimits {
310 max_length: Option<ByteLength>,
311 require_length: bool,
312}
313impl ResponseLimits {
314 fn check(&self, response: isahc::Response<isahc::AsyncBody>) -> Result<isahc::Response<isahc::AsyncBody>, Error> {
315 if self.require_length || self.max_length.is_some() {
316 let response = Response(response);
317 if let Some(len) = response.content_len() {
318 if let Some(max) = self.max_length {
319 if max < len {
320 return Err(Error::MaxLength {
321 content_length: Some(len),
322 max_length: max,
323 });
324 }
325 }
326 } else if self.require_length {
327 return Err(Error::RequireLength);
328 }
329
330 if let Some(max) = self.max_length {
331 let (parts, body) = response.0.into_parts();
332 let response = isahc::Response::from_parts(
333 parts,
334 isahc::AsyncBody::from_reader(super::io::ReadLimited::new(body, max, move || {
335 std::io::Error::new(std::io::ErrorKind::InvalidData, MaxLengthError(None, max))
336 })),
337 );
338
339 Ok(response)
340 } else {
341 Ok(response.0)
342 }
343 } else {
344 Ok(response)
345 }
346 }
347}
348
349#[derive(Debug)]
353pub struct RequestBuilder {
354 builder: isahc::http::request::Builder,
355 limits: ResponseLimits,
356}
357impl Default for RequestBuilder {
358 fn default() -> Self {
359 Request::builder()
360 }
361}
362impl RequestBuilder {
363 pub fn new() -> Self {
365 Request::builder()
366 }
367
368 fn start(builder: isahc::http::request::Builder) -> Self {
369 Self {
370 builder,
371 limits: ResponseLimits::default(),
372 }
373 }
374
375 pub fn method(self, method: impl TryMethod) -> Result<Self, Error> {
377 Ok(Self {
378 builder: self.builder.method(method.try_method()?),
379 limits: self.limits,
380 })
381 }
382
383 pub fn uri(self, uri: impl TryUri) -> Result<Self, Error> {
385 Ok(Self {
386 builder: self.builder.uri(uri.try_uri()?),
387 limits: self.limits,
388 })
389 }
390
391 pub fn header(self, name: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
393 Ok(Self {
394 builder: self.builder.header(name.try_header_name()?, value.try_header_value()?),
395 limits: self.limits,
396 })
397 }
398
399 pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
403 Self {
404 builder: self.builder.cookie_jar(cookie_jar),
405 limits: self.limits,
406 }
407 }
408
409 pub fn timeout(self, timeout: Duration) -> Self {
420 Self {
421 builder: self.builder.timeout(timeout),
422 limits: self.limits,
423 }
424 }
425
426 pub fn connect_timeout(self, timeout: Duration) -> Self {
430 Self {
431 builder: self.builder.connect_timeout(timeout),
432 limits: self.limits,
433 }
434 }
435
436 pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
440 Self {
441 builder: self.builder.low_speed_timeout(low_speed, timeout),
442 limits: self.limits,
443 }
444 }
445
446 pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
452 if !matches!(policy, RedirectPolicy::None) {
453 Self {
454 builder: self.builder.redirect_policy(policy).auto_referer(),
455 limits: self.limits,
456 }
457 } else {
458 Self {
459 builder: self.builder.redirect_policy(policy),
460 limits: self.limits,
461 }
462 }
463 }
464
465 pub fn auto_decompress(self, enabled: bool) -> Self {
473 Self {
474 builder: self.builder.automatic_decompression(enabled),
475 limits: self.limits,
476 }
477 }
478
479 pub fn max_upload_speed(self, max: u64) -> Self {
481 Self {
482 builder: self.builder.max_upload_speed(max),
483 limits: self.limits,
484 }
485 }
486
487 pub fn max_download_speed(self, max: u64) -> Self {
489 Self {
490 builder: self.builder.max_download_speed(max),
491 limits: self.limits,
492 }
493 }
494
495 pub fn max_length(mut self, max: ByteLength) -> Self {
505 self.limits.max_length = Some(max);
506 self
507 }
508
509 pub fn require_length(mut self, require: bool) -> Self {
511 self.limits.require_length = require;
512 self
513 }
514
515 pub fn metrics(self, enable: bool) -> Self {
521 Self {
522 builder: self.builder.metrics(enable),
523 limits: self.limits,
524 }
525 }
526
527 pub fn build(self) -> Request {
529 self.body(()).unwrap()
530 }
531
532 pub fn body(self, body: impl TryBody) -> Result<Request, Error> {
534 Ok(Request {
535 req: self.builder.body(body.try_body()?).unwrap(),
536 limits: self.limits,
537 })
538 }
539
540 pub fn build_custom<F>(self, custom: F) -> Result<Request, Error>
544 where
545 F: FnOnce(isahc::http::request::Builder) -> isahc::http::Result<isahc::Request<isahc::AsyncBody>>,
546 {
547 let req = custom(self.builder)?;
548 Ok(Request {
549 req: req.map(Body),
550 limits: self.limits,
551 })
552 }
553}
554
555pub type ResponseParts = isahc::http::response::Parts;
557
558#[derive(Debug)]
560pub struct Response(isahc::Response<isahc::AsyncBody>);
561impl Response {
562 pub fn status(&self) -> StatusCode {
564 self.0.status()
565 }
566
567 pub fn headers(&self) -> &header::HeaderMap<header::HeaderValue> {
569 self.0.headers()
570 }
571
572 pub fn content_len(&self) -> Option<ByteLength> {
574 self.0.body().len().map(|l| ByteLength(l as usize))
575 }
576
577 pub fn cookie_jar(&self) -> Option<&CookieJar> {
581 self.0.cookie_jar()
582 }
583
584 pub async fn text(&mut self) -> std::io::Result<Txt> {
586 self.0.text().await.map(Txt::from)
587 }
588
589 pub fn effective_uri(&self) -> Option<&Uri> {
593 self.0.effective_uri()
594 }
595
596 pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
598 Body::bytes_impl(self.0.body_mut()).await
599 }
600
601 pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
603 BufReader::new(self.0.body_mut()).read(buf).await
604 }
605
606 pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
608 BufReader::new(self.0.body_mut()).read_exact(buf).await
609 }
610
611 pub async fn json<O>(&mut self) -> Result<O, serde_json::Error>
613 where
614 O: serde::de::DeserializeOwned + std::marker::Unpin,
615 {
616 self.0.json().await
617 }
618
619 pub fn metrics(&self) -> Metrics {
624 self.0.metrics().map(Metrics::from_isahc).unwrap_or_else(Metrics::zero)
625 }
626
627 pub async fn consume(&mut self) -> std::io::Result<()> {
636 self.0.consume().await
637 }
638
639 pub fn new_message(status: impl Into<StatusCode>, msg: impl Into<String>) -> Self {
641 let status = status.into();
642 let msg = msg.into().into_bytes();
643 let msg = futures_lite::io::Cursor::new(msg);
644 let mut r = isahc::Response::new(isahc::AsyncBody::from_reader(msg));
645 *r.status_mut() = status;
646 Self(r)
647 }
648
649 pub fn new(status: StatusCode, headers: header::HeaderMap<header::HeaderValue>, body: Body) -> Self {
651 let mut r = isahc::Response::new(body.0);
652 *r.status_mut() = status;
653 *r.headers_mut() = headers;
654 Self(r)
655 }
656
657 pub fn into_parts(self) -> (ResponseParts, Body) {
659 let (p, b) = self.0.into_parts();
660 (p, Body(b))
661 }
662
663 pub fn from_parts(parts: ResponseParts, body: Body) -> Self {
665 Self(isahc::Response::from_parts(parts, body.0))
666 }
667}
668impl From<Response> for isahc::Response<isahc::AsyncBody> {
669 fn from(r: Response) -> Self {
670 r.0
671 }
672}
673
674#[derive(Debug, Default)]
678pub struct Body(isahc::AsyncBody);
679impl Body {
680 pub fn empty() -> Body {
684 Body(isahc::AsyncBody::empty())
685 }
686
687 pub fn from_bytes_static(bytes: impl AsRef<[u8]> + 'static) -> Self {
694 Body(isahc::AsyncBody::from_bytes_static(bytes))
695 }
696
697 pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self {
699 Body(isahc::AsyncBody::from_reader(read))
700 }
701
702 pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, size: u64) -> Self {
704 Body(isahc::AsyncBody::from_reader_sized(read, size))
705 }
706
707 pub fn is_empty(&self) -> bool {
713 self.0.is_empty()
714 }
715
716 pub fn len(&self) -> Option<u64> {
718 self.0.len()
719 }
720
721 pub fn reset(&mut self) -> bool {
725 self.0.reset()
726 }
727
728 pub async fn bytes(&mut self) -> std::io::Result<Vec<u8>> {
730 Self::bytes_impl(&mut self.0).await
731 }
732 async fn bytes_impl(body: &mut isahc::AsyncBody) -> std::io::Result<Vec<u8>> {
733 let cap = body.len().unwrap_or(1024);
734 let mut bytes = Vec::with_capacity(cap as usize);
735 super::io::copy(body, &mut bytes).await?;
736 Ok(bytes)
737 }
738
739 pub async fn text_utf8(&mut self) -> Result<Txt, Box<dyn std::error::Error>> {
743 let bytes = self.bytes().await?;
744 let r = String::from_utf8(bytes)?;
745 Ok(Txt::from(r))
746 }
747
748 pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
750 BufReader::new(&mut self.0).read(buf).await
751 }
752
753 pub async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
755 BufReader::new(&mut self.0).read_exact(buf).await
756 }
757}
758impl From<Body> for isahc::AsyncBody {
759 fn from(r: Body) -> Self {
760 r.0
761 }
762}
763impl From<isahc::AsyncBody> for Body {
764 fn from(r: isahc::AsyncBody) -> Self {
765 Body(r)
766 }
767}
768impl From<()> for Body {
769 fn from(body: ()) -> Self {
770 Body(body.into())
771 }
772}
773impl From<String> for Body {
774 fn from(body: String) -> Self {
775 Body(body.into())
776 }
777}
778impl From<Txt> for Body {
779 fn from(body: Txt) -> Self {
780 Body(String::from(body).into())
781 }
782}
783impl From<Vec<u8>> for Body {
784 fn from(body: Vec<u8>) -> Self {
785 Body(body.into())
786 }
787}
788impl From<&'_ [u8]> for Body {
789 fn from(body: &[u8]) -> Self {
790 body.to_vec().into()
791 }
792}
793impl From<&'_ str> for Body {
794 fn from(body: &str) -> Self {
795 body.as_bytes().into()
796 }
797}
798impl<T: Into<Self>> From<Option<T>> for Body {
799 fn from(body: Option<T>) -> Self {
800 match body {
801 Some(body) => body.into(),
802 None => Self::empty(),
803 }
804 }
805}
806impl AsyncRead for Body {
807 fn poll_read(
808 self: std::pin::Pin<&mut Self>,
809 cx: &mut std::task::Context<'_>,
810 buf: &mut [u8],
811 ) -> std::task::Poll<std::io::Result<usize>> {
812 Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
813 }
814}
815
816pub async fn get(uri: impl TryUri) -> Result<Response, Error> {
820 default_client().get(uri).await
821}
822
823pub async fn get_txt(uri: impl TryUri) -> Result<Txt, Error> {
827 default_client().get_txt(uri).await
828}
829
830pub async fn get_bytes(uri: impl TryUri) -> Result<Vec<u8>, Error> {
834 default_client().get_bytes(uri).await
835}
836
837pub async fn get_json<O>(uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
841where
842 O: serde::de::DeserializeOwned + std::marker::Unpin,
843{
844 default_client().get_json(uri).await
845}
846
847pub async fn head(uri: impl TryUri) -> Result<Response, Error> {
851 default_client().head(uri).await
852}
853
854pub async fn put(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
858 default_client().put(uri, body).await
859}
860
861pub async fn post(uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
865 default_client().post(uri, body).await
866}
867
868pub async fn delete(uri: impl TryUri) -> Result<Response, Error> {
872 default_client().delete(uri).await
873}
874
875pub async fn send(request: Request) -> Result<Response, Error> {
879 default_client().send(request).await
880}
881
882pub fn default_client() -> &'static Client {
892 use once_cell::sync::Lazy;
893
894 static SHARED: Lazy<Client> = Lazy::new(|| {
895 let ci = mem::replace(&mut *CLIENT_INIT.lock(), ClientInit::Inited);
896 if let ClientInit::Set(init) = ci {
897 init()
898 } else {
899 Client::new()
901 }
902 });
903 &SHARED
904}
905
906static CLIENT_INIT: Mutex<ClientInit> = const_mutex(ClientInit::None);
907
908enum ClientInit {
909 None,
910 Set(Box<dyn FnOnce() -> Client + Send>),
911 Inited,
912}
913
914pub fn set_default_client_init<I>(init: I) -> Result<(), DefaultAlreadyInitedError>
923where
924 I: FnOnce() -> Client + Send + 'static,
925{
926 let mut ci = CLIENT_INIT.lock();
927 if let ClientInit::Inited = &*ci {
928 Err(DefaultAlreadyInitedError)
929 } else {
930 *ci = ClientInit::Set(Box::new(init));
931 Ok(())
932 }
933}
934
935#[derive(Debug, Clone, Copy)]
937pub struct DefaultAlreadyInitedError;
938impl fmt::Display for DefaultAlreadyInitedError {
939 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
940 write!(f, "default client already initialized, can only set before first use")
941 }
942}
943impl std::error::Error for DefaultAlreadyInitedError {}
944
945#[derive(Debug, Clone, PartialEq, Eq)]
947pub struct Metrics {
948 pub upload_progress: (ByteLength, ByteLength),
950
951 pub upload_speed: ByteLength,
953
954 pub download_progress: (ByteLength, ByteLength),
956
957 pub download_speed: ByteLength,
959
960 pub name_lookup_time: Duration,
964
965 pub connect_time: Duration,
969
970 pub secure_connect_time: Duration,
974
975 pub transfer_start_time: Duration,
979
980 pub transfer_time: Duration,
985
986 pub total_time: Duration,
991
992 pub redirect_time: Duration,
995}
996impl Metrics {
997 pub fn from_isahc(m: &isahc::Metrics) -> Self {
999 Self {
1000 upload_progress: {
1001 let (c, t) = m.upload_progress();
1002 ((c as usize).bytes(), (t as usize).bytes())
1003 },
1004 upload_speed: (m.upload_speed().round() as usize).bytes(),
1005 download_progress: {
1006 let (c, t) = m.download_progress();
1007 ((c as usize).bytes(), (t as usize).bytes())
1008 },
1009 download_speed: (m.download_speed().round() as usize).bytes(),
1010 name_lookup_time: m.name_lookup_time(),
1011 connect_time: m.connect_time(),
1012 secure_connect_time: m.secure_connect_time(),
1013 transfer_start_time: m.transfer_start_time(),
1014 transfer_time: m.transfer_time(),
1015 total_time: m.total_time(),
1016 redirect_time: m.redirect_time(),
1017 }
1018 }
1019
1020 pub fn zero() -> Self {
1022 Self {
1023 upload_progress: (0.bytes(), 0.bytes()),
1024 upload_speed: 0.bytes(),
1025 download_progress: (0.bytes(), 0.bytes()),
1026 download_speed: 0.bytes(),
1027 name_lookup_time: Duration::ZERO,
1028 connect_time: Duration::ZERO,
1029 secure_connect_time: Duration::ZERO,
1030 transfer_start_time: Duration::ZERO,
1031 transfer_time: Duration::ZERO,
1032 total_time: Duration::ZERO,
1033 redirect_time: Duration::ZERO,
1034 }
1035 }
1036}
1037impl From<isahc::Metrics> for Metrics {
1038 fn from(m: isahc::Metrics) -> Self {
1039 Metrics::from_isahc(&m)
1040 }
1041}
1042impl fmt::Display for Metrics {
1043 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044 let mut ws = false; if self.upload_progress.0 != self.upload_progress.1 {
1047 write!(
1048 f,
1049 "↑ {} - {}, {}/s",
1050 self.upload_progress.0, self.upload_progress.1, self.upload_speed
1051 )?;
1052 ws = true;
1053 }
1054 if self.download_progress.0 != self.download_progress.1 {
1055 write!(
1056 f,
1057 "{}↓ {} - {}, {}/s",
1058 if ws { "\n" } else { "" },
1059 self.download_progress.0,
1060 self.download_progress.1,
1061 self.download_speed
1062 )?;
1063 ws = true;
1064 }
1065
1066 if !ws {
1067 if self.upload_progress.1.bytes() > 0 {
1068 write!(f, "↑ {}", self.upload_progress.1)?;
1069 ws = true;
1070 }
1071 if self.download_progress.1.bytes() > 0 {
1072 write!(f, "{}↓ {}", if ws { "\n" } else { "" }, self.download_progress.1)?;
1073 ws = true;
1074 }
1075
1076 if ws {
1077 write!(f, "\n{:?}", self.total_time)?;
1078 }
1079 }
1080
1081 Ok(())
1082 }
1083}
1084impl_from_and_into_var! {
1085 fn from(metrics: Metrics) -> Progress {
1086 let mut status = Progress::indeterminate();
1087 if metrics.download_progress.1 > 0.bytes() {
1088 status = Progress::from_n_of(metrics.download_progress.0 .0, metrics.download_progress.1 .0);
1089 }
1090 if metrics.upload_progress.1 > 0.bytes() {
1091 let u_status = Progress::from_n_of(metrics.upload_progress.0 .0, metrics.upload_progress.1 .0);
1092 if status.is_indeterminate() {
1093 status = u_status;
1094 } else {
1095 status = status.and_fct(u_status.fct());
1096 }
1097 }
1098 status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
1099 m.set(*METRICS_ID, metrics);
1100 })
1101 }
1102}
1103zng_state_map::static_id! {
1104 pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
1106}
1107
1108pub struct Client {
1112 client: isahc::HttpClient,
1113 cache: Option<Box<dyn CacheDb>>,
1114 cache_mode: Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>,
1115}
1116impl Default for Client {
1117 fn default() -> Self {
1118 Self::new()
1119 }
1120}
1121impl Clone for Client {
1122 fn clone(&self) -> Self {
1123 Client {
1124 client: self.client.clone(),
1125 cache: self.cache.as_ref().map(|b| b.clone_boxed()),
1126 cache_mode: self.cache_mode.clone(),
1127 }
1128 }
1129}
1130impl fmt::Debug for Client {
1131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132 f.debug_struct("Client").finish_non_exhaustive()
1133 }
1134}
1135impl Client {
1136 pub fn new() -> Self {
1141 Client::builder()
1142 .cookies()
1143 .redirect_policy(RedirectPolicy::Limit(20))
1144 .connect_timeout(90.secs())
1145 .metrics(true)
1146 .build()
1147 }
1148
1149 pub fn builder() -> ClientBuilder {
1151 ClientBuilder {
1152 builder: isahc::HttpClient::builder(),
1153 cache: None,
1154 cache_mode: None,
1155 }
1156 }
1157
1158 pub fn cookie_jar(&self) -> Option<&CookieJar> {
1160 self.client.cookie_jar()
1161 }
1162
1163 pub async fn get(&self, uri: impl TryUri) -> Result<Response, Error> {
1165 self.send(Request::get(uri)?.build()).await
1166 }
1167
1168 pub async fn get_txt(&self, uri: impl TryUri) -> Result<Txt, Error> {
1170 let mut r = self.get(uri).await?;
1171 let r = r.text().await?;
1172 Ok(r)
1173 }
1174
1175 pub async fn get_bytes(&self, uri: impl TryUri) -> Result<Vec<u8>, Error> {
1177 let mut r = self.get(uri).await?;
1178 let r = r.bytes().await?;
1179 Ok(r)
1180 }
1181
1182 pub async fn get_json<O>(&self, uri: impl TryUri) -> Result<O, Box<dyn std::error::Error>>
1184 where
1185 O: serde::de::DeserializeOwned + std::marker::Unpin,
1186 {
1187 let mut r = self.get(uri).await?;
1188 let r = r.json::<O>().await?;
1189 Ok(r)
1190 }
1191
1192 pub async fn head(&self, uri: impl TryUri) -> Result<Response, Error> {
1194 self.send(Request::head(uri)?.build()).await
1195 }
1196 pub async fn put(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1198 self.send(Request::put(uri)?.body(body)?).await
1199 }
1200
1201 pub async fn post(&self, uri: impl TryUri, body: impl TryBody) -> Result<Response, Error> {
1203 self.send(Request::post(uri)?.body(body)?).await
1204 }
1205
1206 pub async fn delete(&self, uri: impl TryUri) -> Result<Response, Error> {
1208 self.send(Request::delete(uri)?.build()).await
1209 }
1210
1211 pub async fn send(&self, request: Request) -> Result<Response, Error> {
1221 if let Some(db) = &self.cache {
1222 match self.cache_mode(&request) {
1223 CacheMode::NoCache => {
1224 let response = self.client.send_async(request.req).await?;
1225 let response = request.limits.check(response)?;
1226 Ok(Response(response))
1227 }
1228 CacheMode::Default => self.send_cache_default(&**db, request, 0).await,
1229 CacheMode::Permanent => self.send_cache_permanent(&**db, request, 0).await,
1230 CacheMode::Error(e) => Err(e),
1231 }
1232 } else {
1233 let response = self.client.send_async(request.req).await?;
1234 let response = request.limits.check(response)?;
1235 Ok(Response(response))
1236 }
1237 }
1238
1239 #[async_recursion::async_recursion]
1240 async fn send_cache_default(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1241 if retry_count == 3 {
1242 tracing::error!("retried cache 3 times, skipping cache");
1243 let response = self.client.send_async(request.req).await?;
1244 let response = request.limits.check(response)?;
1245 return Ok(Response(response));
1246 }
1247
1248 let key = CacheKey::new(&request.req);
1249 if let Some(policy) = db.policy(&key).await {
1250 match policy.before_request(&request.req) {
1251 BeforeRequest::Fresh(parts) => {
1252 if let Some(body) = db.body(&key).await {
1253 let response = isahc::Response::from_parts(parts, body.0);
1254 let response = request.limits.check(response)?;
1255
1256 Ok(Response(response))
1257 } else {
1258 tracing::error!("cache returned policy but not body");
1259 db.remove(&key).await;
1260 self.send_cache_default(db, request, retry_count + 1).await
1261 }
1262 }
1263 BeforeRequest::Stale { request: parts, matches } => {
1264 if matches {
1265 let (_, body) = request.req.into_parts();
1266 let request = Request {
1267 req: isahc::Request::from_parts(parts, body),
1268 limits: request.limits,
1269 };
1270 let policy_request = request.clone_with(()).unwrap().req;
1271 let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1272
1273 let response = self.client.send_async(request.req).await?;
1274 let response = request.limits.check(response)?;
1275
1276 match policy.after_response(&policy_request, &response) {
1277 AfterResponse::NotModified(policy, parts) => {
1278 if let Some(body) = db.body(&key).await {
1279 let response = isahc::Response::from_parts(parts, body.0);
1280
1281 db.set_policy(&key, policy).await;
1282
1283 Ok(Response(response))
1284 } else {
1285 tracing::error!("cache returned policy but not body");
1286 db.remove(&key).await;
1287
1288 if no_req_body {
1289 self.send_cache_default(
1290 db,
1291 Request {
1292 req: policy_request,
1293 limits: request.limits,
1294 },
1295 retry_count + 1,
1296 )
1297 .await
1298 } else {
1299 Err(std::io::Error::new(
1300 std::io::ErrorKind::NotFound,
1301 "cache returned policy but not body, cannot auto-retry",
1302 )
1303 .into())
1304 }
1305 }
1306 }
1307 AfterResponse::Modified(policy, parts) => {
1308 if policy.should_store() {
1309 let (_, body) = response.into_parts();
1310 if let Some(body) = db.set(&key, policy, Body(body)).await {
1311 let response = isahc::Response::from_parts(parts, body.0);
1312
1313 Ok(Response(response))
1314 } else {
1315 tracing::error!("cache db failed to store body");
1316 db.remove(&key).await;
1317
1318 if no_req_body {
1319 self.send_cache_default(
1320 db,
1321 Request {
1322 req: policy_request,
1323 limits: request.limits,
1324 },
1325 retry_count + 1,
1326 )
1327 .await
1328 } else {
1329 Err(std::io::Error::new(
1330 std::io::ErrorKind::NotFound,
1331 "cache db failed to store body, cannot auto-retry",
1332 )
1333 .into())
1334 }
1335 }
1336 } else {
1337 db.remove(&key).await;
1338
1339 Ok(Response(response))
1340 }
1341 }
1342 }
1343 } else {
1344 tracing::error!("cache policy did not match request, {request:?}");
1345 db.remove(&key).await;
1346 let response = self.client.send_async(request.req).await?;
1347 let response = request.limits.check(response)?;
1348 Ok(Response(response))
1349 }
1350 }
1351 }
1352 } else {
1353 let no_req_body = request.req.body().len().map(|l| l == 0).unwrap_or(false);
1354 let policy_request = request.clone_with(()).unwrap().req;
1355
1356 let response = self.client.send_async(request.req).await?;
1357 let response = request.limits.check(response)?;
1358
1359 let policy = CachePolicy::new(&policy_request, &response);
1360
1361 if policy.should_store() {
1362 let (parts, body) = response.into_parts();
1363
1364 if let Some(body) = db.set(&key, policy, Body(body)).await {
1365 let response = isahc::Response::from_parts(parts, body.0);
1366
1367 Ok(Response(response))
1368 } else {
1369 tracing::error!("cache db failed to store body");
1370 db.remove(&key).await;
1371
1372 if no_req_body {
1373 self.send_cache_default(
1374 db,
1375 Request {
1376 req: policy_request,
1377 limits: request.limits,
1378 },
1379 retry_count + 1,
1380 )
1381 .await
1382 } else {
1383 Err(std::io::Error::new(std::io::ErrorKind::NotFound, "cache db failed to store body, cannot auto-retry").into())
1384 }
1385 }
1386 } else {
1387 Ok(Response(response))
1388 }
1389 }
1390 }
1391
1392 #[async_recursion::async_recursion]
1393 async fn send_cache_permanent(&self, db: &dyn CacheDb, request: Request, retry_count: u8) -> Result<Response, Error> {
1394 if retry_count == 3 {
1395 tracing::error!("retried cache 3 times, skipping cache");
1396 let response = self.client.send_async(request.req).await?;
1397 let response = request.limits.check(response)?;
1398 return Ok(Response(response));
1399 }
1400
1401 let key = CacheKey::new(&request.req);
1402 if let Some(policy) = db.policy(&key).await {
1403 if let Some(body) = db.body(&key).await {
1404 match policy.before_request(&request.req) {
1405 BeforeRequest::Fresh(p) => {
1406 let response = isahc::Response::from_parts(p, body.0);
1407 let response = request.limits.check(response)?;
1408
1409 if !policy.is_permanent() {
1410 db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1411 }
1412
1413 Ok(Response(response))
1414 }
1415 BeforeRequest::Stale { request: parts, .. } => {
1416 let limits = request.limits.clone();
1419
1420 let (_, req_body) = request.req.into_parts();
1421 let request = isahc::Request::from_parts(parts, req_body);
1422
1423 let response = self.client.send_async(request).await?;
1424 let response = limits.check(response)?;
1425
1426 let (parts, _) = response.into_parts();
1427
1428 let response = isahc::Response::from_parts(parts, body.0);
1429
1430 db.set_policy(&key, CachePolicy::new_permanent(&response)).await;
1431
1432 Ok(Response(response))
1433 }
1434 }
1435 } else {
1436 tracing::error!("cache returned policy but not body");
1437 db.remove(&key).await;
1438 self.send_cache_permanent(db, request, retry_count + 1).await
1439 }
1440 } else {
1441 let backup_request = if request.req.body().len().map(|l| l == 0).unwrap_or(false) {
1442 Some(request.clone_with(()).unwrap())
1443 } else {
1444 None
1445 };
1446
1447 let response = self.client.send_async(request.req).await?;
1448 let response = request.limits.check(response)?;
1449 let policy = CachePolicy::new_permanent(&response);
1450
1451 let (parts, body) = response.into_parts();
1452
1453 if let Some(body) = db.set(&key, policy, Body(body)).await {
1454 let response = isahc::Response::from_parts(parts, body.0);
1455 Ok(Response(response))
1456 } else {
1457 tracing::error!("cache db failed to store body");
1458 db.remove(&key).await;
1459
1460 if let Some(request) = backup_request {
1461 self.send_cache_permanent(db, request, retry_count + 1).await
1462 } else {
1463 Err(std::io::Error::new(
1464 std::io::ErrorKind::NotFound,
1465 "cache db failed to store permanent body, cannot auto-retry",
1466 )
1467 .into())
1468 }
1469 }
1470 }
1471 }
1472
1473 pub fn cache(&self) -> Option<&dyn CacheDb> {
1475 self.cache.as_deref()
1476 }
1477
1478 pub fn cache_mode(&self, request: &Request) -> CacheMode {
1480 if self.cache.is_none() || request.method() != Method::GET {
1481 CacheMode::NoCache
1482 } else {
1483 (self.cache_mode)(request)
1484 }
1485 }
1486}
1487impl From<Client> for isahc::HttpClient {
1488 fn from(c: Client) -> Self {
1489 c.client
1490 }
1491}
1492impl From<isahc::HttpClient> for Client {
1493 fn from(client: isahc::HttpClient) -> Self {
1494 Self {
1495 client,
1496 cache: None,
1497 cache_mode: Arc::new(|_| CacheMode::default()),
1498 }
1499 }
1500}
1501
1502pub struct ClientBuilder {
1514 builder: isahc::HttpClientBuilder,
1515 cache: Option<Box<dyn CacheDb>>,
1516 cache_mode: Option<Arc<dyn Fn(&Request) -> CacheMode + Send + Sync>>,
1517}
1518impl Default for ClientBuilder {
1519 fn default() -> Self {
1520 Client::builder()
1521 }
1522}
1523impl ClientBuilder {
1524 pub fn new() -> Self {
1526 Client::builder()
1527 }
1528
1529 pub fn build(self) -> Client {
1531 Client {
1532 client: self.builder.build().unwrap(),
1533 cache: self.cache,
1534 cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1535 }
1536 }
1537
1538 pub fn build_custom<F>(self, custom: F) -> Result<Client, Error>
1542 where
1543 F: FnOnce(isahc::HttpClientBuilder) -> Result<isahc::HttpClient, Error>,
1544 {
1545 custom(self.builder).map(|c| Client {
1546 client: c,
1547 cache: self.cache,
1548 cache_mode: self.cache_mode.unwrap_or_else(|| Arc::new(|_| CacheMode::default())),
1549 })
1550 }
1551
1552 pub fn default_header(self, key: impl TryHeaderName, value: impl TryHeaderValue) -> Result<Self, Error> {
1554 Ok(Self {
1555 builder: self.builder.default_header(key.try_header_name()?, value.try_header_value()?),
1556 cache: self.cache,
1557 cache_mode: self.cache_mode,
1558 })
1559 }
1560
1561 pub fn cookies(self) -> Self {
1563 Self {
1564 builder: self.builder.cookies(),
1565 cache: self.cache,
1566 cache_mode: self.cache_mode,
1567 }
1568 }
1569
1570 pub fn cookie_jar(self, cookie_jar: CookieJar) -> Self {
1574 Self {
1575 builder: self.builder.cookie_jar(cookie_jar),
1576 cache: self.cache,
1577 cache_mode: self.cache_mode,
1578 }
1579 }
1580
1581 pub fn timeout(self, timeout: Duration) -> Self {
1592 Self {
1593 builder: self.builder.timeout(timeout),
1594 cache: self.cache,
1595 cache_mode: self.cache_mode,
1596 }
1597 }
1598
1599 pub fn connect_timeout(self, timeout: Duration) -> Self {
1603 Self {
1604 builder: self.builder.connect_timeout(timeout),
1605 cache: self.cache,
1606 cache_mode: self.cache_mode,
1607 }
1608 }
1609
1610 pub fn low_speed_timeout(self, low_speed: u32, timeout: Duration) -> Self {
1614 Self {
1615 builder: self.builder.low_speed_timeout(low_speed, timeout),
1616 cache: self.cache,
1617 cache_mode: self.cache_mode,
1618 }
1619 }
1620
1621 pub fn redirect_policy(self, policy: RedirectPolicy) -> Self {
1625 if !matches!(policy, RedirectPolicy::None) {
1626 Self {
1627 builder: self.builder.redirect_policy(policy).auto_referer(),
1628 cache: self.cache,
1629 cache_mode: self.cache_mode,
1630 }
1631 } else {
1632 Self {
1633 builder: self.builder.redirect_policy(policy),
1634 cache: self.cache,
1635 cache_mode: self.cache_mode,
1636 }
1637 }
1638 }
1639
1640 pub fn auto_decompress(self, enabled: bool) -> Self {
1648 Self {
1649 builder: self.builder.automatic_decompression(enabled),
1650 cache: self.cache,
1651 cache_mode: self.cache_mode,
1652 }
1653 }
1654
1655 pub fn max_upload_speed(self, max: u64) -> Self {
1657 Self {
1658 builder: self.builder.max_upload_speed(max),
1659 cache: self.cache,
1660 cache_mode: self.cache_mode,
1661 }
1662 }
1663
1664 pub fn max_download_speed(self, max: u64) -> Self {
1666 Self {
1667 builder: self.builder.max_download_speed(max),
1668 cache: self.cache,
1669 cache_mode: self.cache_mode,
1670 }
1671 }
1672
1673 pub fn metrics(self, enable: bool) -> Self {
1679 Self {
1680 builder: self.builder.metrics(enable),
1681 cache: self.cache,
1682 cache_mode: self.cache_mode,
1683 }
1684 }
1685
1686 pub fn cache(self, cache: impl CacheDb) -> Self {
1690 Self {
1691 builder: self.builder,
1692 cache: Some(Box::new(cache)),
1693 cache_mode: self.cache_mode,
1694 }
1695 }
1696
1697 pub fn cache_mode(self, selector: impl Fn(&Request) -> CacheMode + Send + Sync + 'static) -> Self {
1706 Self {
1707 builder: self.builder,
1708 cache: self.cache,
1709 cache_mode: Some(Arc::new(selector)),
1710 }
1711 }
1712}
1713
1714#[derive(Debug, Clone)]
1716#[non_exhaustive]
1717pub enum Error {
1718 Client(isahc::Error),
1720 MaxLength {
1724 content_length: Option<ByteLength>,
1726 max_length: ByteLength,
1728 },
1729 RequireLength,
1733}
1734impl StdError for Error {
1735 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1736 match self {
1737 Error::Client(e) => Some(e),
1738 _ => None,
1739 }
1740 }
1741}
1742impl From<isahc::Error> for Error {
1743 fn from(e: isahc::Error) -> Self {
1744 if let Some(e) = e
1745 .source()
1746 .and_then(|e| e.downcast_ref::<std::io::Error>())
1747 .and_then(|e| e.get_ref())
1748 {
1749 if let Some(e) = e.downcast_ref::<MaxLengthError>() {
1750 return Error::MaxLength {
1751 content_length: e.0,
1752 max_length: e.1,
1753 };
1754 }
1755 if e.downcast_ref::<RequireLengthError>().is_some() {
1756 return Error::RequireLength;
1757 }
1758 }
1759 Error::Client(e)
1760 }
1761}
1762impl From<isahc::http::Error> for Error {
1763 fn from(e: isahc::http::Error) -> Self {
1764 isahc::Error::from(e).into()
1765 }
1766}
1767impl From<std::io::Error> for Error {
1768 fn from(e: std::io::Error) -> Self {
1769 isahc::Error::from(e).into()
1770 }
1771}
1772impl fmt::Display for Error {
1773 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1774 match self {
1775 Error::Client(e) => write!(f, "{e}"),
1776 Error::MaxLength {
1777 content_length,
1778 max_length,
1779 } => write!(f, "{}", MaxLengthError(*content_length, *max_length)),
1780 Error::RequireLength => write!(f, "{RequireLengthError}"),
1781 }
1782 }
1783}
1784
1785#[derive(Debug)]
1788struct MaxLengthError(Option<ByteLength>, ByteLength);
1789impl fmt::Display for MaxLengthError {
1790 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1791 if let Some(l) = self.0 {
1792 write!(f, "content-length of {l} exceeds limit of {}", self.1)
1793 } else {
1794 write!(f, "download reached limit of {}", self.1)
1795 }
1796 }
1797}
1798impl StdError for MaxLengthError {}
1799
1800#[derive(Debug)]
1801struct RequireLengthError;
1802impl fmt::Display for RequireLengthError {
1803 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1804 write!(f, "content-length is required")
1805 }
1806}
1807impl StdError for RequireLengthError {}