1use std::{
8 fmt,
9 io::ErrorKind,
10 pin::Pin,
11 sync::Arc,
12 task::{self, Poll},
13 time::Duration,
14};
15
16use crate::{McWaker, Progress};
17
18#[doc(no_inline)]
19pub use futures_lite::io::{
20 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BoxedReader, BoxedWriter,
21 BufReader, BufWriter, Cursor, ReadHalf, WriteHalf, copy, empty, repeat, sink, split,
22};
23use parking_lot::Mutex;
24use std::io::{Error, Result};
25use zng_time::{DInstant, INSTANT};
26use zng_txt::formatx;
27use zng_unit::{ByteLength, ByteUnits};
28use zng_var::impl_from_and_into_var;
29
30pub struct Measure<T> {
35 task: T,
36 metrics: Metrics,
37 start_time: DInstant,
38 last_write: DInstant,
39 last_read: DInstant,
40}
41impl<T> Measure<T> {
42 pub fn start(task: T, total_read: impl Into<ByteLength>, total_write: impl Into<ByteLength>) -> Self {
44 Self::resume(task, (0, total_read), (0, total_write))
45 }
46
47 pub fn resume(
49 task: T,
50 read_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
51 write_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
52 ) -> Self {
53 let now = INSTANT.now();
54 Measure {
55 task,
56 metrics: Metrics {
57 read_progress: (read_progress.0.into(), read_progress.1.into()),
58 read_speed: 0.bytes(),
59 write_progress: (write_progress.0.into(), write_progress.1.into()),
60 write_speed: 0.bytes(),
61 total_time: Duration::ZERO,
62 },
63 start_time: now,
64 last_write: now,
65 last_read: now,
66 }
67 }
68
69 pub fn metrics(&mut self) -> &Metrics {
73 &self.metrics
74 }
75
76 pub fn finish(mut self) -> (T, Metrics) {
78 self.metrics.total_time = self.start_time.elapsed();
79 (self.task, self.metrics)
80 }
81}
82
83fn bytes_per_sec(bytes: ByteLength, elapsed: Duration) -> ByteLength {
84 let bytes_per_sec = bytes.0 as u128 / elapsed.as_nanos() / Duration::from_secs(1).as_nanos();
85 ByteLength(bytes_per_sec as usize)
86}
87
88impl<T: AsyncRead + Unpin> AsyncRead for Measure<T> {
89 fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
90 let self_ = self.get_mut();
91 match Pin::new(&mut self_.task).poll_read(cx, buf) {
92 Poll::Ready(Ok(bytes)) => {
93 if bytes > 0 {
94 let bytes = bytes.bytes();
95 self_.metrics.read_progress.0 += bytes;
96
97 let now = INSTANT.now();
98 let elapsed = now - self_.last_read;
99
100 self_.last_read = now;
101 self_.metrics.read_speed = bytes_per_sec(bytes, elapsed);
102
103 self_.metrics.total_time = now - self_.start_time;
104 }
105 Poll::Ready(Ok(bytes))
106 }
107 p => p,
108 }
109 }
110}
111impl<T: AsyncWrite + Unpin> AsyncWrite for Measure<T> {
112 fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
113 let self_ = self.get_mut();
114 match Pin::new(&mut self_.task).poll_write(cx, buf) {
115 Poll::Ready(Ok(bytes)) => {
116 if bytes > 0 {
117 let bytes = bytes.bytes();
118 self_.metrics.write_progress.0 += bytes;
119
120 let now = INSTANT.now();
121 let elapsed = now - self_.last_write;
122
123 self_.last_write = now;
124 self_.metrics.write_speed = bytes_per_sec(bytes, elapsed);
125
126 self_.metrics.total_time = now - self_.start_time;
127 }
128 Poll::Ready(Ok(bytes))
129 }
130 p => p,
131 }
132 }
133
134 fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
135 Pin::new(&mut self.get_mut().task).poll_flush(cx)
136 }
137
138 fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
139 Pin::new(&mut self.get_mut().task).poll_close(cx)
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct Metrics {
148 pub read_progress: (ByteLength, ByteLength),
150
151 pub read_speed: ByteLength,
153
154 pub write_progress: (ByteLength, ByteLength),
156
157 pub write_speed: ByteLength,
159
160 pub total_time: Duration,
163}
164impl Metrics {
165 pub fn zero() -> Self {
167 Self {
168 read_progress: (0.bytes(), 0.bytes()),
169 read_speed: 0.bytes(),
170 write_progress: (0.bytes(), 0.bytes()),
171 write_speed: 0.bytes(),
172 total_time: Duration::ZERO,
173 }
174 }
175}
176impl fmt::Display for Metrics {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 let mut w = false;
179 if self.read_progress.1 > 0.bytes() {
180 w = true;
181 if self.read_progress.0 != self.read_progress.1 {
182 write!(f, "↓ {}-{}, {}/s", self.read_progress.0, self.read_progress.1, self.read_speed)?;
183 w = true;
184 } else {
185 write!(f, "↓ {} . {:?}", self.read_progress.0, self.total_time)?;
186 }
187 }
188 if self.write_progress.1 > 0.bytes() {
189 if w {
190 writeln!(f)?;
191 }
192 if self.write_progress.0 != self.write_progress.1 {
193 write!(f, "↑ {} - {}, {}/s", self.write_progress.0, self.write_progress.1, self.write_speed)?;
194 } else {
195 write!(f, "↑ {} . {:?}", self.write_progress.0, self.total_time)?;
196 }
197 }
198
199 Ok(())
200 }
201}
202impl_from_and_into_var! {
203 fn from(metrics: Metrics) -> Progress {
204 let mut status = Progress::indeterminate();
205 if metrics.read_progress.1 > 0.bytes() {
206 status = Progress::from_n_of(metrics.read_progress.0 .0, metrics.read_progress.1 .0);
207 }
208 if metrics.write_progress.1 > 0.bytes() {
209 let w_status = Progress::from_n_of(metrics.write_progress.0 .0, metrics.write_progress.1 .0);
210 if status.is_indeterminate() {
211 status = w_status;
212 } else {
213 status = status.and_fct(w_status.fct());
214 }
215 }
216 status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
217 m.set(*METRICS_ID, metrics);
218 })
219 }
220}
221
222zng_state_map::static_id! {
223 pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
225}
226
227pub trait McBufErrorExt {
229 fn is_only_lazy_left(&self) -> bool;
234}
235impl McBufErrorExt for std::io::Error {
236 fn is_only_lazy_left(&self) -> bool {
237 matches!(self.kind(), ErrorKind::Other) && format!("{self:?}").contains(ONLY_NON_LAZY_ERROR_MSG)
238 }
239}
240const ONLY_NON_LAZY_ERROR_MSG: &str = "no non-lazy readers left to read";
241
242pub struct McBufReader<S: AsyncRead> {
264 inner: Arc<Mutex<McBufInner<S>>>,
265 index: usize,
266 lazy: bool,
267}
268struct McBufInner<S: AsyncRead> {
269 source: Option<S>,
270 waker: McWaker,
271 lazy_wakers: Vec<task::Waker>,
272
273 buf: Vec<u8>,
274
275 clones: Vec<usize>,
276 non_lazy_count: usize,
277
278 result: ReadState,
279}
280impl<S: AsyncRead> McBufReader<S> {
281 pub fn new(source: S) -> Self {
283 let mut clones = Vec::with_capacity(2);
284 clones.push(0);
285 McBufReader {
286 inner: Arc::new(Mutex::new(McBufInner {
287 source: Some(source),
288 waker: McWaker::empty(),
289 lazy_wakers: vec![],
290
291 buf: Vec::with_capacity(10.kilobytes().0),
292
293 clones,
294 non_lazy_count: 1,
295
296 result: ReadState::Running,
297 })),
298 index: 0,
299 lazy: false,
300 }
301 }
302
303 pub fn is_lazy(&self) -> bool {
307 self.lazy
308 }
309
310 pub fn set_lazy(&mut self, lazy: bool) {
314 if self.lazy != lazy {
315 if lazy {
316 self.inner.lock().non_lazy_count -= 1;
317 } else {
318 self.inner.lock().non_lazy_count += 1;
319 }
320 self.lazy = lazy;
321 }
322 }
323}
324impl<S: AsyncRead> Clone for McBufReader<S> {
325 fn clone(&self) -> Self {
326 let mut inner = self.inner.lock();
327
328 let offset = inner.clones[self.index];
329 let index = inner.clones.len();
330 inner.clones.push(offset);
331
332 if !self.lazy {
333 inner.non_lazy_count += 1;
334 }
335
336 Self {
337 inner: self.inner.clone(),
338 index,
339 lazy: self.lazy,
340 }
341 }
342}
343impl<S: AsyncRead> Drop for McBufReader<S> {
344 fn drop(&mut self) {
345 let mut inner = self.inner.lock();
346 inner.clones[self.index] = usize::MAX;
347 if !self.lazy {
348 inner.non_lazy_count -= 1;
349 if inner.non_lazy_count == 0 {
350 for waker in inner.lazy_wakers.drain(..) {
352 waker.wake();
353 }
354 }
355 }
356 }
357}
358impl<S: AsyncRead> AsyncRead for McBufReader<S> {
359 fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
360 let self_ = self.as_ref();
361 let mut inner = self_.inner.lock();
362 let inner = &mut *inner;
363
364 let mut i = inner.clones[self_.index];
366 let mut ready;
367
368 match &inner.result {
369 ReadState::Running => {
370 ready = &inner.buf[i..];
373
374 if ready.is_empty() {
375 if self.lazy {
376 if inner.non_lazy_count == 0 {
377 return Poll::Ready(Err(Error::new(ErrorKind::Other, ONLY_NON_LAZY_ERROR_MSG)));
379 } else {
380 inner.lazy_wakers.push(cx.waker().clone());
382
383 return Poll::Pending;
385 }
386 }
387
388 ready = &[];
391
392 let waker = match inner.waker.push(cx.waker().clone()) {
393 Some(w) => w,
394 None => {
395 return Poll::Pending;
397 }
398 };
399
400 let min_i = inner.clones.iter().copied().min().unwrap();
401 if min_i > 0 {
402 inner.buf.copy_within(min_i.., 0);
404 inner.buf.truncate(inner.buf.len() - min_i);
405
406 i -= min_i;
407 for i in &mut inner.clones {
408 *i -= min_i;
409 }
410 }
411
412 let new_start = inner.buf.len();
413
414 inner.buf.resize(inner.buf.len() + buf.len().max(10.kilobytes().0), 0);
415
416 let mut inner_cx = task::Context::from_waker(&waker);
417
418 let source = unsafe { Pin::new_unchecked(inner.source.as_mut().unwrap()) };
420 let result = source.poll_read(&mut inner_cx, &mut inner.buf[new_start..]);
421
422 match result {
423 Poll::Ready(result) => {
424 for waker in inner.lazy_wakers.drain(..) {
426 waker.wake();
427 }
428
429 match result {
430 Ok(0) => {
431 inner.waker.cancel();
432
433 inner.buf.truncate(new_start);
435 inner.result = ReadState::Eof;
436 inner.source = None;
437
438 }
440 Ok(read) => {
441 inner.waker.cancel();
442
443 inner.buf.truncate(new_start + read);
445 ready = &inner.buf[i..];
446
447 }
449 Err(e) => {
450 inner.waker.cancel();
451
452 inner.result = ReadState::Err(CloneableError::new(&e));
454 inner.buf = vec![];
455 inner.source = None;
456
457 return Poll::Ready(Err(e));
458 }
459 }
460 }
461
462 Poll::Pending => {
463 inner.buf.truncate(new_start);
464 return Poll::Pending;
465 }
466 }
467 }
468 }
469 ReadState::Eof => {
470 ready = &inner.buf[i..];
471
472 }
474 ReadState::Err(e) => return Poll::Ready(e.err()),
475 }
476
477 let max_ready = buf.len().min(ready.len());
480 buf[..max_ready].copy_from_slice(&ready[..max_ready]);
481
482 i += max_ready;
483 inner.clones[self_.index] = i;
484
485 Poll::Ready(Ok(max_ready))
486 }
487}
488
489#[derive(Clone)]
500pub struct CloneableError {
501 info: ErrorInfo,
502}
503#[derive(Clone)]
504enum ErrorInfo {
505 OsError(i32),
506 Other(ErrorKind, String),
507}
508impl CloneableError {
509 pub fn new(e: &Error) -> Self {
511 let info = if let Some(code) = e.raw_os_error() {
512 ErrorInfo::OsError(code)
513 } else {
514 ErrorInfo::Other(e.kind(), format!("{e}"))
515 };
516
517 Self { info }
518 }
519
520 pub fn err<T>(&self) -> Result<T> {
522 Err(self.clone().into())
523 }
524}
525impl From<CloneableError> for Error {
526 fn from(e: CloneableError) -> Self {
527 match e.info {
528 ErrorInfo::OsError(code) => Error::from_raw_os_error(code),
529 ErrorInfo::Other(kind, msg) => Error::new(kind, msg),
530 }
531 }
532}
533
534pub struct ReadLimited<S, L> {
536 source: S,
537 limit: usize,
538 on_limit: L,
539}
540impl<S, L> ReadLimited<S, L>
541where
542 S: AsyncRead,
543 L: Fn() -> std::io::Error,
544{
545 pub fn new(source: S, limit: ByteLength, on_limit: L) -> Self {
549 Self {
550 source,
551 limit: limit.0,
552 on_limit,
553 }
554 }
555}
556impl<S, L> AsyncRead for ReadLimited<S, L>
557where
558 S: AsyncRead,
559 L: Fn() -> std::io::Error,
560{
561 fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
562 let self_ = unsafe { self.get_unchecked_mut() };
564
565 if self_.limit == 0 {
566 let err = (self_.on_limit)();
567 return Poll::Ready(Err(err));
568 }
569
570 if buf.len() > self_.limit {
571 buf = &mut buf[..self_.limit];
572 }
573
574 match unsafe { Pin::new_unchecked(&mut self_.source) }.poll_read(cx, buf) {
576 Poll::Ready(Ok(l)) => {
577 self_.limit = self_.limit.saturating_sub(l);
578 if self_.limit == 0 {
579 let err = (self_.on_limit)();
580 Poll::Ready(Err(err))
581 } else {
582 Poll::Ready(Ok(l))
583 }
584 }
585 r => r,
586 }
587 }
588}
589
590enum ReadState {
591 Running,
592 Eof,
593 Err(CloneableError),
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599 use crate as task;
600 use zng_unit::TimeUnits;
601
602 #[test]
603 pub fn mc_buf_reader_parallel() {
604 let data = Data::new(60.kilobytes().0);
605
606 let mut expected = vec![0; data.len];
607 let _ = data.clone().blocking_read(&mut expected[..]);
608
609 let mut a = McBufReader::new(data);
610 let mut b = a.clone();
611 let mut c = a.clone();
612
613 let (a, b, c) = async_test(async move {
614 let a = task::run(async move {
615 let mut buf = vec![];
616 a.read_to_end(&mut buf).await.unwrap();
617 buf
618 });
619 let b = task::run(async move {
620 let mut buf: Vec<u8> = vec![];
621 b.read_to_end(&mut buf).await.unwrap();
622 buf
623 });
624 let c = task::run(async move {
625 let mut buf: Vec<u8> = vec![];
626 c.read_to_end(&mut buf).await.unwrap();
627 buf
628 });
629
630 task::all!(a, b, c).await
631 });
632
633 crate::assert_vec_eq!(expected, a);
634 crate::assert_vec_eq!(expected, b);
635 crate::assert_vec_eq!(expected, c);
636 }
637
638 #[test]
639 pub fn mc_buf_reader_single() {
640 let data = Data::new(60.kilobytes().0);
641
642 let mut expected = vec![0; data.len];
643 let _ = data.clone().blocking_read(&mut expected[..]);
644
645 let mut a = McBufReader::new(data);
646
647 let a = async_test(async move {
648 let a = task::run(async move {
649 let mut buf = vec![];
650 a.read_to_end(&mut buf).await.unwrap();
651 buf
652 });
653
654 a.await
655 });
656
657 crate::assert_vec_eq!(expected, a);
658 }
659
660 #[test]
661 pub fn mc_buf_reader_sequential() {
662 let data = Data::new(60.kilobytes().0);
663
664 let mut expected = vec![0; data.len];
665 let _ = data.clone().blocking_read(&mut expected[..]);
666
667 let mut clones = vec![McBufReader::new(data)];
668 for _ in 0..5 {
669 clones.push(clones[0].clone());
670 }
671
672 let r = async_test(async move {
673 let mut r = vec![];
674
675 for mut clone in clones {
676 let mut buf = vec![];
677 clone.read_to_end(&mut buf).await.unwrap();
678 r.push(buf);
679 }
680
681 r
682 });
683
684 for r in r {
685 crate::assert_vec_eq!(expected, r);
686 }
687 }
688
689 #[test]
690 pub fn mc_buf_reader_completed() {
691 let data = Data::new(60.kilobytes().0);
692 let mut buf = Vec::with_capacity(data.len);
693 let mut a = McBufReader::new(data);
694
695 let r = async_test(async move {
696 a.read_to_end(&mut buf).await.unwrap();
697
698 let mut b = a.clone();
699 buf.clear();
700
701 b.read_to_end(&mut buf).await.unwrap();
702 buf.len()
703 });
704
705 assert_eq!(0, r);
706 }
707
708 #[test]
709 pub fn mc_buf_reader_error() {
710 let mut data = Data::new(20.kilobytes().0);
711 data.set_error();
712
713 let mut expected = vec![0; data.len];
714 let _ = data.clone().blocking_read(&mut expected[..]);
715
716 let mut a = McBufReader::new(data);
717 let mut b = a.clone();
718
719 let (a, b) = async_test(async move {
720 let a = task::run(async move {
721 let mut buf = vec![];
722 a.read_to_end(&mut buf).await.unwrap_err()
723 });
724 let b = task::run(async move {
725 let mut buf: Vec<u8> = vec![];
726 b.read_to_end(&mut buf).await.unwrap_err()
727 });
728
729 task::all!(a, b).await
730 });
731
732 assert_eq!(ErrorKind::InvalidData, a.kind());
733 assert_eq!(ErrorKind::InvalidData, b.kind());
734 }
735
736 #[test]
737 pub fn mc_buf_reader_error_completed() {
738 let mut data = Data::new(20.kilobytes().0);
739 data.set_error();
740
741 let mut buf = Vec::with_capacity(data.len);
742 let mut a = McBufReader::new(data);
743
744 let (a, b) = async_test(async move {
745 let a_err = a.read_to_end(&mut buf).await.unwrap_err();
746
747 let mut b = a.clone();
748 buf.clear();
749
750 let b_err = b.read_to_end(&mut buf).await.unwrap_err();
751
752 (a_err, b_err)
753 });
754
755 assert_eq!(ErrorKind::InvalidData, a.kind());
756 assert_eq!(ErrorKind::InvalidData, b.kind());
757 }
758
759 #[test]
760 pub fn mc_buf_reader_parallel_with_delay1() {
761 let mut data = Data::new(60.kilobytes().0);
762 data.enable_pending();
763
764 let mut expected = vec![0; data.len];
765 let _ = data.clone().blocking_read(&mut expected[..]);
766
767 let mut a = McBufReader::new(data);
768 let mut b = a.clone();
769 let mut c = a.clone();
770
771 let (a, b, c) = async_test(async move {
772 let a = task::run(async move {
773 let mut buf = vec![];
774 a.read_to_end(&mut buf).await.unwrap();
775 buf
776 });
777 let b = task::run(async move {
778 let mut buf: Vec<u8> = vec![];
779 b.read_to_end(&mut buf).await.unwrap();
780 buf
781 });
782 let c = task::run(async move {
783 let mut buf: Vec<u8> = vec![];
784 c.read_to_end(&mut buf).await.unwrap();
785 buf
786 });
787
788 task::all!(a, b, c).await
789 });
790
791 crate::assert_vec_eq!(expected, a);
792 crate::assert_vec_eq!(expected, b);
793 crate::assert_vec_eq!(expected, c);
794 }
795
796 #[test]
797 pub fn mc_buf_reader_parallel_with_delay2() {
798 let mut data = Data::new(60.kilobytes().0);
799 data.enable_pending();
800
801 let mut expected = vec![0; data.len];
802 let _ = data.clone().blocking_read(&mut expected[..]);
803
804 let mut a = McBufReader::new(data);
805 let mut b = a.clone();
806 let mut c = a.clone();
807
808 let (a, b, c) = async_test(async move {
809 let a = task::run(async move {
810 let mut buf = vec![];
811 a.read_to_end(&mut buf).await.unwrap();
812 buf
813 });
814 let b = task::run(async move {
815 let mut buf: Vec<u8> = vec![];
816 task::deadline(5.ms()).await;
817 b.read_to_end(&mut buf).await.unwrap();
818 buf
819 });
820 let c = task::run(async move {
821 let mut buf: Vec<u8> = vec![];
822 c.read_to_end(&mut buf).await.unwrap();
823 buf
824 });
825
826 task::all!(a, b, c).await
827 });
828
829 crate::assert_vec_eq!(expected, a);
830 crate::assert_vec_eq!(expected, b);
831 crate::assert_vec_eq!(expected, c);
832 }
833
834 #[derive(Clone)]
835 struct Data {
836 b: u8,
837 len: usize,
838 error: Option<CloneableError>,
839 delay: Duration,
840 pending: bool,
841 }
842 impl Data {
843 pub fn new(len: usize) -> Self {
844 Self {
845 b: 0,
846 len,
847 error: None,
848 delay: 0.ms(),
849 pending: false,
850 }
851 }
852 pub fn blocking_read(&mut self, buf: &mut [u8]) -> Result<usize> {
853 let len = self.len;
854 for b in buf.iter_mut().take(len) {
855 *b = self.b;
856 self.len -= 1;
857 self.b = self.b.wrapping_add(1);
858 }
859
860 if len == 0 {
861 if let Some(e) = &self.error {
862 return e.err();
863 }
864 }
865
866 Ok(buf.len().min(len))
867 }
868 pub fn set_error(&mut self) {
869 self.error = Some(CloneableError::new(&Error::new(ErrorKind::InvalidData, "test error")));
870 }
871
872 pub fn enable_pending(&mut self) {
873 self.delay = 3.ms();
874 }
875 }
876 impl AsyncRead for Data {
877 fn poll_read(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
878 if self.delay > Duration::ZERO {
879 self.pending = !self.pending;
880 if self.pending {
881 let waker = cx.waker().clone();
882 let delay = self.delay;
883 task::spawn(async move {
884 task::deadline(delay).await;
885 waker.wake();
886 });
887 return Poll::Pending;
888 }
889 }
890
891 let r = self.as_mut().blocking_read(buf);
892 Poll::Ready(r)
893 }
894 }
895
896 #[track_caller]
897 fn async_test<F>(test: F) -> F::Output
898 where
899 F: Future,
900 {
901 task::block_on(task::with_deadline(test, 5.secs())).unwrap()
902 }
903
904 #[macro_export]
906 macro_rules! assert_vec_eq {
907 ($a:expr, $b: expr) => {
908 match (&$a, &$b) {
909 (ref a, ref b) => {
910 let len_not_eq = a.len() != b.len();
911 let mut data_not_eq = None;
912 for (i, (a, b)) in a.iter().zip(b.iter()).enumerate() {
913 if a != b {
914 data_not_eq = Some(i);
915 break;
916 }
917 }
918
919 if len_not_eq || data_not_eq.is_some() {
920 use std::fmt::*;
921
922 let mut error = format!("`{}` != `{}`", stringify!($a), stringify!($b));
923 if len_not_eq {
924 let _ = write!(&mut error, "\n lengths not equal: {} != {}", a.len(), b.len());
925 }
926 if let Some(i) = data_not_eq {
927 let _ = write!(&mut error, "\n data not equal at index {}: {} != {:?}", i, a[i], b[i]);
928 }
929 panic!("{error}")
930 }
931 }
932 }
933 };
934 }
935}