1use std::{
8 fmt,
9 io::ErrorKind,
10 pin::Pin,
11 sync::Arc,
12 task::{self, Poll},
13 time::Duration,
14};
15
16use crate::{McWaker, Progress};
17
18#[doc(no_inline)]
19pub use futures_lite::io::{
20 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BoxedReader, BoxedWriter,
21 BufReader, BufWriter, Cursor, ReadHalf, WriteHalf, copy, empty, repeat, sink, split,
22};
23use parking_lot::Mutex;
24use std::io::{Error, Result};
25use zng_time::{DInstant, INSTANT};
26use zng_txt::formatx;
27use zng_unit::{ByteLength, ByteUnits};
28use zng_var::impl_from_and_into_var;
29
30pub struct Measure<T> {
35 task: T,
36 metrics: Metrics,
37 start_time: DInstant,
38 last_write: DInstant,
39 last_read: DInstant,
40}
41impl<T> Measure<T> {
42 pub fn start(task: T, total_read: impl Into<ByteLength>, total_write: impl Into<ByteLength>) -> Self {
44 Self::resume(task, (0, total_read), (0, total_write))
45 }
46
47 pub fn resume(
49 task: T,
50 read_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
51 write_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
52 ) -> Self {
53 let now = INSTANT.now();
54 Measure {
55 task,
56 metrics: Metrics {
57 read_progress: (read_progress.0.into(), read_progress.1.into()),
58 read_speed: 0.bytes(),
59 write_progress: (write_progress.0.into(), write_progress.1.into()),
60 write_speed: 0.bytes(),
61 total_time: Duration::ZERO,
62 },
63 start_time: now,
64 last_write: now,
65 last_read: now,
66 }
67 }
68
69 pub fn metrics(&mut self) -> &Metrics {
73 &self.metrics
74 }
75
76 pub fn finish(mut self) -> (T, Metrics) {
78 self.metrics.total_time = self.start_time.elapsed();
79 (self.task, self.metrics)
80 }
81}
82
83fn bytes_per_sec(bytes: ByteLength, elapsed: Duration) -> ByteLength {
84 let bytes_per_sec = bytes.0 as u128 / elapsed.as_nanos() / Duration::from_secs(1).as_nanos();
85 ByteLength(bytes_per_sec as usize)
86}
87
88impl<T: AsyncRead + Unpin> AsyncRead for Measure<T> {
89 fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
90 let self_ = self.get_mut();
91 match Pin::new(&mut self_.task).poll_read(cx, buf) {
92 Poll::Ready(Ok(bytes)) => {
93 if bytes > 0 {
94 let bytes = bytes.bytes();
95 self_.metrics.read_progress.0 += bytes;
96
97 let now = INSTANT.now();
98 let elapsed = now - self_.last_read;
99
100 self_.last_read = now;
101 self_.metrics.read_speed = bytes_per_sec(bytes, elapsed);
102
103 self_.metrics.total_time = now - self_.start_time;
104 }
105 Poll::Ready(Ok(bytes))
106 }
107 p => p,
108 }
109 }
110}
111impl<T: AsyncWrite + Unpin> AsyncWrite for Measure<T> {
112 fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
113 let self_ = self.get_mut();
114 match Pin::new(&mut self_.task).poll_write(cx, buf) {
115 Poll::Ready(Ok(bytes)) => {
116 if bytes > 0 {
117 let bytes = bytes.bytes();
118 self_.metrics.write_progress.0 += bytes;
119
120 let now = INSTANT.now();
121 let elapsed = now - self_.last_write;
122
123 self_.last_write = now;
124 self_.metrics.write_speed = bytes_per_sec(bytes, elapsed);
125
126 self_.metrics.total_time = now - self_.start_time;
127 }
128 Poll::Ready(Ok(bytes))
129 }
130 p => p,
131 }
132 }
133
134 fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
135 Pin::new(&mut self.get_mut().task).poll_flush(cx)
136 }
137
138 fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
139 Pin::new(&mut self.get_mut().task).poll_close(cx)
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
147#[non_exhaustive]
148pub struct Metrics {
149 pub read_progress: (ByteLength, ByteLength),
151
152 pub read_speed: ByteLength,
154
155 pub write_progress: (ByteLength, ByteLength),
157
158 pub write_speed: ByteLength,
160
161 pub total_time: Duration,
164}
165impl Metrics {
166 pub fn zero() -> Self {
168 Self {
169 read_progress: (0.bytes(), 0.bytes()),
170 read_speed: 0.bytes(),
171 write_progress: (0.bytes(), 0.bytes()),
172 write_speed: 0.bytes(),
173 total_time: Duration::ZERO,
174 }
175 }
176}
177impl fmt::Display for Metrics {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 let mut w = false;
180 if self.read_progress.1 > 0.bytes() {
181 w = true;
182 if self.read_progress.0 != self.read_progress.1 {
183 write!(f, "↓ {}-{}, {}/s", self.read_progress.0, self.read_progress.1, self.read_speed)?;
184 w = true;
185 } else {
186 write!(f, "↓ {} . {:?}", self.read_progress.0, self.total_time)?;
187 }
188 }
189 if self.write_progress.1 > 0.bytes() {
190 if w {
191 writeln!(f)?;
192 }
193 if self.write_progress.0 != self.write_progress.1 {
194 write!(f, "↑ {} - {}, {}/s", self.write_progress.0, self.write_progress.1, self.write_speed)?;
195 } else {
196 write!(f, "↑ {} . {:?}", self.write_progress.0, self.total_time)?;
197 }
198 }
199
200 Ok(())
201 }
202}
203impl_from_and_into_var! {
204 fn from(metrics: Metrics) -> Progress {
205 let mut status = Progress::indeterminate();
206 if metrics.read_progress.1 > 0.bytes() {
207 status = Progress::from_n_of(metrics.read_progress.0.0, metrics.read_progress.1.0);
208 }
209 if metrics.write_progress.1 > 0.bytes() {
210 let w_status = Progress::from_n_of(metrics.write_progress.0.0, metrics.write_progress.1.0);
211 if status.is_indeterminate() {
212 status = w_status;
213 } else {
214 status = status.and_fct(w_status.fct());
215 }
216 }
217 status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
218 m.set(*METRICS_ID, metrics);
219 })
220 }
221}
222
223zng_state_map::static_id! {
224 pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
226}
227
228pub trait McBufErrorExt {
230 fn is_only_lazy_left(&self) -> bool;
235}
236impl McBufErrorExt for std::io::Error {
237 fn is_only_lazy_left(&self) -> bool {
238 matches!(self.kind(), ErrorKind::Other) && format!("{self:?}").contains(ONLY_NON_LAZY_ERROR_MSG)
239 }
240}
241const ONLY_NON_LAZY_ERROR_MSG: &str = "no non-lazy readers left to read";
242
243pub struct McBufReader<S: AsyncRead> {
265 inner: Arc<Mutex<McBufInner<S>>>,
266 index: usize,
267 lazy: bool,
268}
269struct McBufInner<S: AsyncRead> {
270 source: Option<S>,
271 waker: McWaker,
272 lazy_wakers: Vec<task::Waker>,
273
274 buf: Vec<u8>,
275
276 clones: Vec<usize>,
277 non_lazy_count: usize,
278
279 result: ReadState,
280}
281impl<S: AsyncRead> McBufReader<S> {
282 pub fn new(source: S) -> Self {
284 let mut clones = Vec::with_capacity(2);
285 clones.push(0);
286 McBufReader {
287 inner: Arc::new(Mutex::new(McBufInner {
288 source: Some(source),
289 waker: McWaker::empty(),
290 lazy_wakers: vec![],
291
292 buf: Vec::with_capacity(10.kilobytes().0),
293
294 clones,
295 non_lazy_count: 1,
296
297 result: ReadState::Running,
298 })),
299 index: 0,
300 lazy: false,
301 }
302 }
303
304 pub fn is_lazy(&self) -> bool {
308 self.lazy
309 }
310
311 pub fn set_lazy(&mut self, lazy: bool) {
315 if self.lazy != lazy {
316 if lazy {
317 self.inner.lock().non_lazy_count -= 1;
318 } else {
319 self.inner.lock().non_lazy_count += 1;
320 }
321 self.lazy = lazy;
322 }
323 }
324}
325impl<S: AsyncRead> Clone for McBufReader<S> {
326 fn clone(&self) -> Self {
327 let mut inner = self.inner.lock();
328
329 let offset = inner.clones[self.index];
330 let index = inner.clones.len();
331 inner.clones.push(offset);
332
333 if !self.lazy {
334 inner.non_lazy_count += 1;
335 }
336
337 Self {
338 inner: self.inner.clone(),
339 index,
340 lazy: self.lazy,
341 }
342 }
343}
344impl<S: AsyncRead> Drop for McBufReader<S> {
345 fn drop(&mut self) {
346 let mut inner = self.inner.lock();
347 inner.clones[self.index] = usize::MAX;
348 if !self.lazy {
349 inner.non_lazy_count -= 1;
350 if inner.non_lazy_count == 0 {
351 for waker in inner.lazy_wakers.drain(..) {
353 waker.wake();
354 }
355 }
356 }
357 }
358}
359impl<S: AsyncRead> AsyncRead for McBufReader<S> {
360 fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
361 let self_ = self.as_ref();
362 let mut inner = self_.inner.lock();
363 let inner = &mut *inner;
364
365 let mut i = inner.clones[self_.index];
367 let mut ready;
368
369 match &inner.result {
370 ReadState::Running => {
371 ready = &inner.buf[i..];
374
375 if ready.is_empty() {
376 if self.lazy {
377 if inner.non_lazy_count == 0 {
378 return Poll::Ready(Err(Error::other(ONLY_NON_LAZY_ERROR_MSG)));
380 } else {
381 inner.lazy_wakers.push(cx.waker().clone());
383
384 return Poll::Pending;
386 }
387 }
388
389 ready = &[];
392
393 let waker = match inner.waker.push(cx.waker().clone()) {
394 Some(w) => w,
395 None => {
396 return Poll::Pending;
398 }
399 };
400
401 let min_i = inner.clones.iter().copied().min().unwrap();
402 if min_i > 0 {
403 inner.buf.copy_within(min_i.., 0);
405 inner.buf.truncate(inner.buf.len() - min_i);
406
407 i -= min_i;
408 for i in &mut inner.clones {
409 *i -= min_i;
410 }
411 }
412
413 let new_start = inner.buf.len();
414
415 inner.buf.resize(inner.buf.len() + buf.len().max(10.kilobytes().0), 0);
416
417 let mut inner_cx = task::Context::from_waker(&waker);
418
419 let source = unsafe { Pin::new_unchecked(inner.source.as_mut().unwrap()) };
421 let result = source.poll_read(&mut inner_cx, &mut inner.buf[new_start..]);
422
423 match result {
424 Poll::Ready(result) => {
425 for waker in inner.lazy_wakers.drain(..) {
427 waker.wake();
428 }
429
430 match result {
431 Ok(0) => {
432 inner.waker.cancel();
433
434 inner.buf.truncate(new_start);
436 inner.result = ReadState::Eof;
437 inner.source = None;
438
439 }
441 Ok(read) => {
442 inner.waker.cancel();
443
444 inner.buf.truncate(new_start + read);
446 ready = &inner.buf[i..];
447
448 }
450 Err(e) => {
451 inner.waker.cancel();
452
453 inner.result = ReadState::Err(CloneableError::new(&e));
455 inner.buf = vec![];
456 inner.source = None;
457
458 return Poll::Ready(Err(e));
459 }
460 }
461 }
462
463 Poll::Pending => {
464 inner.buf.truncate(new_start);
465 return Poll::Pending;
466 }
467 }
468 }
469 }
470 ReadState::Eof => {
471 ready = &inner.buf[i..];
472
473 }
475 ReadState::Err(e) => return Poll::Ready(e.err()),
476 }
477
478 let max_ready = buf.len().min(ready.len());
481 buf[..max_ready].copy_from_slice(&ready[..max_ready]);
482
483 i += max_ready;
484 inner.clones[self_.index] = i;
485
486 Poll::Ready(Ok(max_ready))
487 }
488}
489
490#[derive(Clone)]
501pub struct CloneableError {
502 info: ErrorInfo,
503}
504#[derive(Clone)]
505enum ErrorInfo {
506 OsError(i32),
507 Other(ErrorKind, String),
508}
509impl CloneableError {
510 pub fn new(e: &Error) -> Self {
512 let info = if let Some(code) = e.raw_os_error() {
513 ErrorInfo::OsError(code)
514 } else {
515 ErrorInfo::Other(e.kind(), format!("{e}"))
516 };
517
518 Self { info }
519 }
520
521 pub fn err<T>(&self) -> Result<T> {
523 Err(self.clone().into())
524 }
525}
526impl From<CloneableError> for Error {
527 fn from(e: CloneableError) -> Self {
528 match e.info {
529 ErrorInfo::OsError(code) => Error::from_raw_os_error(code),
530 ErrorInfo::Other(kind, msg) => Error::new(kind, msg),
531 }
532 }
533}
534
535pub struct ReadLimited<S, L> {
537 source: S,
538 limit: usize,
539 on_limit: L,
540}
541impl<S, L> ReadLimited<S, L>
542where
543 S: AsyncRead,
544 L: Fn() -> std::io::Error,
545{
546 pub fn new(source: S, limit: ByteLength, on_limit: L) -> Self {
550 Self {
551 source,
552 limit: limit.0,
553 on_limit,
554 }
555 }
556}
557impl<S, L> AsyncRead for ReadLimited<S, L>
558where
559 S: AsyncRead,
560 L: Fn() -> std::io::Error,
561{
562 fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
563 let self_ = unsafe { self.get_unchecked_mut() };
565
566 if self_.limit == 0 {
567 let err = (self_.on_limit)();
568 return Poll::Ready(Err(err));
569 }
570
571 if buf.len() > self_.limit {
572 buf = &mut buf[..self_.limit];
573 }
574
575 match unsafe { Pin::new_unchecked(&mut self_.source) }.poll_read(cx, buf) {
577 Poll::Ready(Ok(l)) => {
578 self_.limit = self_.limit.saturating_sub(l);
579 if self_.limit == 0 {
580 let err = (self_.on_limit)();
581 Poll::Ready(Err(err))
582 } else {
583 Poll::Ready(Ok(l))
584 }
585 }
586 r => r,
587 }
588 }
589}
590
591enum ReadState {
592 Running,
593 Eof,
594 Err(CloneableError),
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use crate as task;
601 use zng_unit::TimeUnits;
602
603 #[test]
604 pub fn mc_buf_reader_parallel() {
605 let data = Data::new(60.kilobytes().0);
606
607 let mut expected = vec![0; data.len];
608 let _ = data.clone().blocking_read(&mut expected[..]);
609
610 let mut a = McBufReader::new(data);
611 let mut b = a.clone();
612 let mut c = a.clone();
613
614 let (a, b, c) = async_test(async move {
615 let a = task::run(async move {
616 let mut buf = vec![];
617 a.read_to_end(&mut buf).await.unwrap();
618 buf
619 });
620 let b = task::run(async move {
621 let mut buf: Vec<u8> = vec![];
622 b.read_to_end(&mut buf).await.unwrap();
623 buf
624 });
625 let c = task::run(async move {
626 let mut buf: Vec<u8> = vec![];
627 c.read_to_end(&mut buf).await.unwrap();
628 buf
629 });
630
631 task::all!(a, b, c).await
632 });
633
634 crate::assert_vec_eq!(expected, a);
635 crate::assert_vec_eq!(expected, b);
636 crate::assert_vec_eq!(expected, c);
637 }
638
639 #[test]
640 pub fn mc_buf_reader_single() {
641 let data = Data::new(60.kilobytes().0);
642
643 let mut expected = vec![0; data.len];
644 let _ = data.clone().blocking_read(&mut expected[..]);
645
646 let mut a = McBufReader::new(data);
647
648 let a = async_test(async move {
649 let a = task::run(async move {
650 let mut buf = vec![];
651 a.read_to_end(&mut buf).await.unwrap();
652 buf
653 });
654
655 a.await
656 });
657
658 crate::assert_vec_eq!(expected, a);
659 }
660
661 #[test]
662 pub fn mc_buf_reader_sequential() {
663 let data = Data::new(60.kilobytes().0);
664
665 let mut expected = vec![0; data.len];
666 let _ = data.clone().blocking_read(&mut expected[..]);
667
668 let mut clones = vec![McBufReader::new(data)];
669 for _ in 0..5 {
670 clones.push(clones[0].clone());
671 }
672
673 let r = async_test(async move {
674 let mut r = vec![];
675
676 for mut clone in clones {
677 let mut buf = vec![];
678 clone.read_to_end(&mut buf).await.unwrap();
679 r.push(buf);
680 }
681
682 r
683 });
684
685 for r in r {
686 crate::assert_vec_eq!(expected, r);
687 }
688 }
689
690 #[test]
691 pub fn mc_buf_reader_completed() {
692 let data = Data::new(60.kilobytes().0);
693 let mut buf = Vec::with_capacity(data.len);
694 let mut a = McBufReader::new(data);
695
696 let r = async_test(async move {
697 a.read_to_end(&mut buf).await.unwrap();
698
699 let mut b = a.clone();
700 buf.clear();
701
702 b.read_to_end(&mut buf).await.unwrap();
703 buf.len()
704 });
705
706 assert_eq!(0, r);
707 }
708
709 #[test]
710 pub fn mc_buf_reader_error() {
711 let mut data = Data::new(20.kilobytes().0);
712 data.set_error();
713
714 let mut expected = vec![0; data.len];
715 let _ = data.clone().blocking_read(&mut expected[..]);
716
717 let mut a = McBufReader::new(data);
718 let mut b = a.clone();
719
720 let (a, b) = async_test(async move {
721 let a = task::run(async move {
722 let mut buf = vec![];
723 a.read_to_end(&mut buf).await.unwrap_err()
724 });
725 let b = task::run(async move {
726 let mut buf: Vec<u8> = vec![];
727 b.read_to_end(&mut buf).await.unwrap_err()
728 });
729
730 task::all!(a, b).await
731 });
732
733 assert_eq!(ErrorKind::InvalidData, a.kind());
734 assert_eq!(ErrorKind::InvalidData, b.kind());
735 }
736
737 #[test]
738 pub fn mc_buf_reader_error_completed() {
739 let mut data = Data::new(20.kilobytes().0);
740 data.set_error();
741
742 let mut buf = Vec::with_capacity(data.len);
743 let mut a = McBufReader::new(data);
744
745 let (a, b) = async_test(async move {
746 let a_err = a.read_to_end(&mut buf).await.unwrap_err();
747
748 let mut b = a.clone();
749 buf.clear();
750
751 let b_err = b.read_to_end(&mut buf).await.unwrap_err();
752
753 (a_err, b_err)
754 });
755
756 assert_eq!(ErrorKind::InvalidData, a.kind());
757 assert_eq!(ErrorKind::InvalidData, b.kind());
758 }
759
760 #[test]
761 pub fn mc_buf_reader_parallel_with_delay1() {
762 let mut data = Data::new(60.kilobytes().0);
763 data.enable_pending();
764
765 let mut expected = vec![0; data.len];
766 let _ = data.clone().blocking_read(&mut expected[..]);
767
768 let mut a = McBufReader::new(data);
769 let mut b = a.clone();
770 let mut c = a.clone();
771
772 let (a, b, c) = async_test(async move {
773 let a = task::run(async move {
774 let mut buf = vec![];
775 a.read_to_end(&mut buf).await.unwrap();
776 buf
777 });
778 let b = task::run(async move {
779 let mut buf: Vec<u8> = vec![];
780 b.read_to_end(&mut buf).await.unwrap();
781 buf
782 });
783 let c = task::run(async move {
784 let mut buf: Vec<u8> = vec![];
785 c.read_to_end(&mut buf).await.unwrap();
786 buf
787 });
788
789 task::all!(a, b, c).await
790 });
791
792 crate::assert_vec_eq!(expected, a);
793 crate::assert_vec_eq!(expected, b);
794 crate::assert_vec_eq!(expected, c);
795 }
796
797 #[test]
798 pub fn mc_buf_reader_parallel_with_delay2() {
799 let mut data = Data::new(60.kilobytes().0);
800 data.enable_pending();
801
802 let mut expected = vec![0; data.len];
803 let _ = data.clone().blocking_read(&mut expected[..]);
804
805 let mut a = McBufReader::new(data);
806 let mut b = a.clone();
807 let mut c = a.clone();
808
809 let (a, b, c) = async_test(async move {
810 let a = task::run(async move {
811 let mut buf = vec![];
812 a.read_to_end(&mut buf).await.unwrap();
813 buf
814 });
815 let b = task::run(async move {
816 let mut buf: Vec<u8> = vec![];
817 task::deadline(5.ms()).await;
818 b.read_to_end(&mut buf).await.unwrap();
819 buf
820 });
821 let c = task::run(async move {
822 let mut buf: Vec<u8> = vec![];
823 c.read_to_end(&mut buf).await.unwrap();
824 buf
825 });
826
827 task::all!(a, b, c).await
828 });
829
830 crate::assert_vec_eq!(expected, a);
831 crate::assert_vec_eq!(expected, b);
832 crate::assert_vec_eq!(expected, c);
833 }
834
835 #[derive(Clone)]
836 struct Data {
837 b: u8,
838 len: usize,
839 error: Option<CloneableError>,
840 delay: Duration,
841 pending: bool,
842 }
843 impl Data {
844 pub fn new(len: usize) -> Self {
845 Self {
846 b: 0,
847 len,
848 error: None,
849 delay: 0.ms(),
850 pending: false,
851 }
852 }
853 pub fn blocking_read(&mut self, buf: &mut [u8]) -> Result<usize> {
854 let len = self.len;
855 for b in buf.iter_mut().take(len) {
856 *b = self.b;
857 self.len -= 1;
858 self.b = self.b.wrapping_add(1);
859 }
860
861 if len == 0
862 && let Some(e) = &self.error
863 {
864 return e.err();
865 }
866
867 Ok(buf.len().min(len))
868 }
869 pub fn set_error(&mut self) {
870 self.error = Some(CloneableError::new(&Error::new(ErrorKind::InvalidData, "test error")));
871 }
872
873 pub fn enable_pending(&mut self) {
874 self.delay = 3.ms();
875 }
876 }
877 impl AsyncRead for Data {
878 fn poll_read(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
879 if self.delay > Duration::ZERO {
880 self.pending = !self.pending;
881 if self.pending {
882 let waker = cx.waker().clone();
883 let delay = self.delay;
884 task::spawn(async move {
885 task::deadline(delay).await;
886 waker.wake();
887 });
888 return Poll::Pending;
889 }
890 }
891
892 let r = self.as_mut().blocking_read(buf);
893 Poll::Ready(r)
894 }
895 }
896
897 #[track_caller]
898 fn async_test<F>(test: F) -> F::Output
899 where
900 F: Future,
901 {
902 task::block_on(task::with_deadline(test, 5.secs())).unwrap()
903 }
904
905 #[macro_export]
907 macro_rules! assert_vec_eq {
908 ($a:expr, $b: expr) => {
909 match (&$a, &$b) {
910 (ref a, ref b) => {
911 let len_not_eq = a.len() != b.len();
912 let mut data_not_eq = None;
913 for (i, (a, b)) in a.iter().zip(b.iter()).enumerate() {
914 if a != b {
915 data_not_eq = Some(i);
916 break;
917 }
918 }
919
920 if len_not_eq || data_not_eq.is_some() {
921 use std::fmt::*;
922
923 let mut error = format!("`{}` != `{}`", stringify!($a), stringify!($b));
924 if len_not_eq {
925 let _ = write!(&mut error, "\n lengths not equal: {} != {}", a.len(), b.len());
926 }
927 if let Some(i) = data_not_eq {
928 let _ = write!(&mut error, "\n data not equal at index {}: {} != {:?}", i, a[i], b[i]);
929 }
930 panic!("{error}")
931 }
932 }
933 }
934 };
935 }
936}