zng_task/
io.rs

1//! IO tasks.
2//!
3//! Most of the types in this module are re-exported from [`futures_lite::io`].
4//!
5//! [`futures_lite::io`]: https://docs.rs/futures-lite/latest/futures_lite/io/index.html
6
7use std::{
8    fmt,
9    io::ErrorKind,
10    pin::Pin,
11    sync::Arc,
12    task::{self, Poll},
13    time::Duration,
14};
15
16use crate::{McWaker, Progress};
17
18#[doc(no_inline)]
19pub use futures_lite::io::{
20    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BoxedReader, BoxedWriter,
21    BufReader, BufWriter, Cursor, ReadHalf, WriteHalf, copy, empty, repeat, sink, split,
22};
23use parking_lot::Mutex;
24use std::io::{Error, Result};
25use zng_time::{DInstant, INSTANT};
26use zng_txt::formatx;
27use zng_unit::{ByteLength, ByteUnits};
28use zng_var::impl_from_and_into_var;
29
30/// Measure read/write of an async task.
31///
32/// Metrics are updated after each read/write, if you read/write all bytes in one call
33/// the metrics will only update once.
34pub struct Measure<T> {
35    task: T,
36    metrics: Metrics,
37    start_time: DInstant,
38    last_write: DInstant,
39    last_read: DInstant,
40}
41impl<T> Measure<T> {
42    /// Start measuring a new read/write task.
43    pub fn start(task: T, total_read: impl Into<ByteLength>, total_write: impl Into<ByteLength>) -> Self {
44        Self::resume(task, (0, total_read), (0, total_write))
45    }
46
47    /// Continue measuring a read/write task.
48    pub fn resume(
49        task: T,
50        read_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
51        write_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
52    ) -> Self {
53        let now = INSTANT.now();
54        Measure {
55            task,
56            metrics: Metrics {
57                read_progress: (read_progress.0.into(), read_progress.1.into()),
58                read_speed: 0.bytes(),
59                write_progress: (write_progress.0.into(), write_progress.1.into()),
60                write_speed: 0.bytes(),
61                total_time: Duration::ZERO,
62            },
63            start_time: now,
64            last_write: now,
65            last_read: now,
66        }
67    }
68
69    /// Current metrics.
70    ///
71    /// This value is updated after every read/write.
72    pub fn metrics(&mut self) -> &Metrics {
73        &self.metrics
74    }
75
76    /// Unwrap the inner task and final metrics.
77    pub fn finish(mut self) -> (T, Metrics) {
78        self.metrics.total_time = self.start_time.elapsed();
79        (self.task, self.metrics)
80    }
81}
82
83fn bytes_per_sec(bytes: ByteLength, elapsed: Duration) -> ByteLength {
84    let bytes_per_sec = bytes.0 as u128 / elapsed.as_nanos() / Duration::from_secs(1).as_nanos();
85    ByteLength(bytes_per_sec as usize)
86}
87
88impl<T: AsyncRead + Unpin> AsyncRead for Measure<T> {
89    fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
90        let self_ = self.get_mut();
91        match Pin::new(&mut self_.task).poll_read(cx, buf) {
92            Poll::Ready(Ok(bytes)) => {
93                if bytes > 0 {
94                    let bytes = bytes.bytes();
95                    self_.metrics.read_progress.0 += bytes;
96
97                    let now = INSTANT.now();
98                    let elapsed = now - self_.last_read;
99
100                    self_.last_read = now;
101                    self_.metrics.read_speed = bytes_per_sec(bytes, elapsed);
102
103                    self_.metrics.total_time = now - self_.start_time;
104                }
105                Poll::Ready(Ok(bytes))
106            }
107            p => p,
108        }
109    }
110}
111impl<T: AsyncWrite + Unpin> AsyncWrite for Measure<T> {
112    fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
113        let self_ = self.get_mut();
114        match Pin::new(&mut self_.task).poll_write(cx, buf) {
115            Poll::Ready(Ok(bytes)) => {
116                if bytes > 0 {
117                    let bytes = bytes.bytes();
118                    self_.metrics.write_progress.0 += bytes;
119
120                    let now = INSTANT.now();
121                    let elapsed = now - self_.last_write;
122
123                    self_.last_write = now;
124                    self_.metrics.write_speed = bytes_per_sec(bytes, elapsed);
125
126                    self_.metrics.total_time = now - self_.start_time;
127                }
128                Poll::Ready(Ok(bytes))
129            }
130            p => p,
131        }
132    }
133
134    fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
135        Pin::new(&mut self.get_mut().task).poll_flush(cx)
136    }
137
138    fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
139        Pin::new(&mut self.get_mut().task).poll_close(cx)
140    }
141}
142
143/// Information about the state of an async IO task.
144///
145/// Use [`Measure`] to measure a task.
146#[derive(Debug, Clone, PartialEq, Eq)]
147#[non_exhaustive]
148pub struct Metrics {
149    /// Number of bytes read / estimated total.
150    pub read_progress: (ByteLength, ByteLength),
151
152    /// Average read speed in bytes/second.
153    pub read_speed: ByteLength,
154
155    /// Number of bytes written / estimated total.
156    pub write_progress: (ByteLength, ByteLength),
157
158    /// Average write speed in bytes/second.
159    pub write_speed: ByteLength,
160
161    /// Total time for the entire task. This will continuously increase until
162    /// the task is finished.
163    pub total_time: Duration,
164}
165impl Metrics {
166    /// All zeros.
167    pub fn zero() -> Self {
168        Self {
169            read_progress: (0.bytes(), 0.bytes()),
170            read_speed: 0.bytes(),
171            write_progress: (0.bytes(), 0.bytes()),
172            write_speed: 0.bytes(),
173            total_time: Duration::ZERO,
174        }
175    }
176}
177impl fmt::Display for Metrics {
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        let mut w = false;
180        if self.read_progress.1 > 0.bytes() {
181            w = true;
182            if self.read_progress.0 != self.read_progress.1 {
183                write!(f, "↓ {}-{}, {}/s", self.read_progress.0, self.read_progress.1, self.read_speed)?;
184                w = true;
185            } else {
186                write!(f, "↓ {} . {:?}", self.read_progress.0, self.total_time)?;
187            }
188        }
189        if self.write_progress.1 > 0.bytes() {
190            if w {
191                writeln!(f)?;
192            }
193            if self.write_progress.0 != self.write_progress.1 {
194                write!(f, "↑ {} - {}, {}/s", self.write_progress.0, self.write_progress.1, self.write_speed)?;
195            } else {
196                write!(f, "↑ {} . {:?}", self.write_progress.0, self.total_time)?;
197            }
198        }
199
200        Ok(())
201    }
202}
203impl_from_and_into_var! {
204    fn from(metrics: Metrics) -> Progress {
205        let mut status = Progress::indeterminate();
206        if metrics.read_progress.1 > 0.bytes() {
207            status = Progress::from_n_of(metrics.read_progress.0.0, metrics.read_progress.1.0);
208        }
209        if metrics.write_progress.1 > 0.bytes() {
210            let w_status = Progress::from_n_of(metrics.write_progress.0.0, metrics.write_progress.1.0);
211            if status.is_indeterminate() {
212                status = w_status;
213            } else {
214                status = status.and_fct(w_status.fct());
215            }
216        }
217        status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
218            m.set(*METRICS_ID, metrics);
219        })
220    }
221}
222
223zng_state_map::static_id! {
224    /// Metrics in a [`Progress::with_meta`] metadata.
225    pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
226}
227
228/// Extension methods for [`std::io::Error`] to be used with errors returned by [`McBufReader`].
229pub trait McBufErrorExt {
230    /// Returns `true` if this error represents the condition where there are only [`McBufReader::is_lazy`] readers
231    /// left, the buffer is drained and the inner reader is not EOF.
232    ///
233    /// You can recover from this error by turning the reader non-lazy using [`McBufReader::set_lazy`].
234    fn is_only_lazy_left(&self) -> bool;
235}
236impl McBufErrorExt for std::io::Error {
237    fn is_only_lazy_left(&self) -> bool {
238        matches!(self.kind(), ErrorKind::Other) && format!("{self:?}").contains(ONLY_NON_LAZY_ERROR_MSG)
239    }
240}
241const ONLY_NON_LAZY_ERROR_MSG: &str = "no non-lazy readers left to read";
242
243/// Multiple consumer buffered read.
244///
245/// Clone an instance to create a new consumer, already read bytes stay in the buffer until all clones have read it,
246/// clones continue reading from the same offset as the reader they cloned.
247///
248/// A single instance of this reader behaves like a `BufReader`.
249///
250/// # Result
251///
252/// The result is *repeats* ready when `EOF` or an [`Error`] occurs, unfortunately the IO error is not cloneable
253/// so the error is recreated using [`CloneableError`] for subsequent poll attempts.
254///
255/// The inner reader is dropped as soon as it finishes.
256///
257/// # Lazy Clones
258///
259/// You can mark clones as [lazy], lazy clones don't pull from the inner reader, only advance when another clone reads, if
260/// all living clones are lazy they stop reading with an error. You can identify this custom error using the [`McBufErrorExt::is_only_lazy_left`]
261/// extension method.
262///
263/// [lazy]: Self::set_lazy
264pub struct McBufReader<S: AsyncRead> {
265    inner: Arc<Mutex<McBufInner<S>>>,
266    index: usize,
267    lazy: bool,
268}
269struct McBufInner<S: AsyncRead> {
270    source: Option<S>,
271    waker: McWaker,
272    lazy_wakers: Vec<task::Waker>,
273
274    buf: Vec<u8>,
275
276    clones: Vec<usize>,
277    non_lazy_count: usize,
278
279    result: ReadState,
280}
281impl<S: AsyncRead> McBufReader<S> {
282    /// Creates a buffered reader.
283    pub fn new(source: S) -> Self {
284        let mut clones = Vec::with_capacity(2);
285        clones.push(0);
286        McBufReader {
287            inner: Arc::new(Mutex::new(McBufInner {
288                source: Some(source),
289                waker: McWaker::empty(),
290                lazy_wakers: vec![],
291
292                buf: Vec::with_capacity(10.kilobytes().0),
293
294                clones,
295                non_lazy_count: 1,
296
297                result: ReadState::Running,
298            })),
299            index: 0,
300            lazy: false,
301        }
302    }
303
304    /// Returns `true` if this reader does not pull from the inner reader, only advancing when a non-lazy reader advances.
305    ///
306    /// The initial reader is not lazy, only clones of lazy readers are lazy by default.
307    pub fn is_lazy(&self) -> bool {
308        self.lazy
309    }
310
311    /// Sets [`is_lazy`].
312    ///
313    /// [`is_lazy`]: Self::is_lazy
314    pub fn set_lazy(&mut self, lazy: bool) {
315        if self.lazy != lazy {
316            if lazy {
317                self.inner.lock().non_lazy_count -= 1;
318            } else {
319                self.inner.lock().non_lazy_count += 1;
320            }
321            self.lazy = lazy;
322        }
323    }
324}
325impl<S: AsyncRead> Clone for McBufReader<S> {
326    fn clone(&self) -> Self {
327        let mut inner = self.inner.lock();
328
329        let offset = inner.clones[self.index];
330        let index = inner.clones.len();
331        inner.clones.push(offset);
332
333        if !self.lazy {
334            inner.non_lazy_count += 1;
335        }
336
337        Self {
338            inner: self.inner.clone(),
339            index,
340            lazy: self.lazy,
341        }
342    }
343}
344impl<S: AsyncRead> Drop for McBufReader<S> {
345    fn drop(&mut self) {
346        let mut inner = self.inner.lock();
347        inner.clones[self.index] = usize::MAX;
348        if !self.lazy {
349            inner.non_lazy_count -= 1;
350            if inner.non_lazy_count == 0 {
351                // notify lazy so they get the error.
352                for waker in inner.lazy_wakers.drain(..) {
353                    waker.wake();
354                }
355            }
356        }
357    }
358}
359impl<S: AsyncRead> AsyncRead for McBufReader<S> {
360    fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
361        let self_ = self.as_ref();
362        let mut inner = self_.inner.lock();
363        let inner = &mut *inner;
364
365        // ready data for this clone.
366        let mut i = inner.clones[self_.index];
367        let mut ready;
368
369        match &inner.result {
370            ReadState::Running => {
371                // source has not finished yet.
372
373                ready = &inner.buf[i..];
374
375                if ready.is_empty() {
376                    if self.lazy {
377                        if inner.non_lazy_count == 0 {
378                            // user can make this reader non-lazy and try again.
379                            return Poll::Ready(Err(Error::other(ONLY_NON_LAZY_ERROR_MSG)));
380                        } else {
381                            // register waker for after non-lazy poll.
382                            inner.lazy_wakers.push(cx.waker().clone());
383
384                            // wait non-lazy to pull.
385                            return Poll::Pending;
386                        }
387                    }
388
389                    // time to poll source.
390
391                    ready = &[];
392
393                    let waker = match inner.waker.push(cx.waker().clone()) {
394                        Some(w) => w,
395                        None => {
396                            // already polling from another clone.
397                            return Poll::Pending;
398                        }
399                    };
400
401                    let min_i = inner.clones.iter().copied().min().unwrap();
402                    if min_i > 0 {
403                        // reuse front.
404                        inner.buf.copy_within(min_i.., 0);
405                        inner.buf.truncate(inner.buf.len() - min_i);
406
407                        i -= min_i;
408                        for i in &mut inner.clones {
409                            *i -= min_i;
410                        }
411                    }
412
413                    let new_start = inner.buf.len();
414
415                    inner.buf.resize(inner.buf.len() + buf.len().max(10.kilobytes().0), 0);
416
417                    let mut inner_cx = task::Context::from_waker(&waker);
418
419                    // SAFETY: we don't move `source`.
420                    let source = unsafe { Pin::new_unchecked(inner.source.as_mut().unwrap()) };
421                    let result = source.poll_read(&mut inner_cx, &mut inner.buf[new_start..]);
422
423                    match result {
424                        Poll::Ready(result) => {
425                            // notify lazy readers.
426                            for waker in inner.lazy_wakers.drain(..) {
427                                waker.wake();
428                            }
429
430                            match result {
431                                Ok(0) => {
432                                    inner.waker.cancel();
433
434                                    // EOF
435                                    inner.buf.truncate(new_start);
436                                    inner.result = ReadState::Eof;
437                                    inner.source = None;
438
439                                    // continue 'copy ready
440                                }
441                                Ok(read) => {
442                                    inner.waker.cancel();
443
444                                    // Read > 0
445                                    inner.buf.truncate(new_start + read);
446                                    ready = &inner.buf[i..];
447
448                                    // continue 'copy ready
449                                }
450                                Err(e) => {
451                                    inner.waker.cancel();
452
453                                    // Error
454                                    inner.result = ReadState::Err(CloneableError::new(&e));
455                                    inner.buf = vec![];
456                                    inner.source = None;
457
458                                    return Poll::Ready(Err(e));
459                                }
460                            }
461                        }
462
463                        Poll::Pending => {
464                            inner.buf.truncate(new_start);
465                            return Poll::Pending;
466                        }
467                    }
468                }
469            }
470            ReadState::Eof => {
471                ready = &inner.buf[i..];
472
473                // continue 'copy ready
474            }
475            ReadState::Err(e) => return Poll::Ready(e.err()),
476        }
477
478        // 'copy ready
479
480        let max_ready = buf.len().min(ready.len());
481        buf[..max_ready].copy_from_slice(&ready[..max_ready]);
482
483        i += max_ready;
484        inner.clones[self_.index] = i;
485
486        Poll::Ready(Ok(max_ready))
487    }
488}
489
490/// Represents the cloneable parts of an [`Error`].
491///
492/// Unfortunately [`Error`] does not implement clone, this is needed to implemented
493/// IO futures that repeat the ready result after subsequent polls. This type partially
494/// works around the issue by copying enough information to recreate an error that is still useful.
495///
496/// The OS error code, [`ErrorKind`] and display message are preserved. Note that this not an error type,
497/// it must be converted to [`Error`] using `into` or [`err`].
498///
499/// [`err`]: Self::err
500#[derive(Clone)]
501pub struct CloneableError {
502    info: ErrorInfo,
503}
504#[derive(Clone)]
505enum ErrorInfo {
506    OsError(i32),
507    Other(ErrorKind, String),
508}
509impl CloneableError {
510    /// Copy the cloneable information from the [`Error`].
511    pub fn new(e: &Error) -> Self {
512        let info = if let Some(code) = e.raw_os_error() {
513            ErrorInfo::OsError(code)
514        } else {
515            ErrorInfo::Other(e.kind(), format!("{e}"))
516        };
517
518        Self { info }
519    }
520
521    /// Returns an `Err(Error)` generated from the cloneable information.
522    pub fn err<T>(&self) -> Result<T> {
523        Err(self.clone().into())
524    }
525}
526impl From<CloneableError> for Error {
527    fn from(e: CloneableError) -> Self {
528        match e.info {
529            ErrorInfo::OsError(code) => Error::from_raw_os_error(code),
530            ErrorInfo::Other(kind, msg) => Error::new(kind, msg),
531        }
532    }
533}
534
535/// Represents a future that generates an error if an `AsyncRead` exceeds a limit.
536pub struct ReadLimited<S, L> {
537    source: S,
538    limit: usize,
539    on_limit: L,
540}
541impl<S, L> ReadLimited<S, L>
542where
543    S: AsyncRead,
544    L: Fn() -> std::io::Error,
545{
546    /// Construct a limited reader.
547    ///
548    /// The `on_limit` closure is called if the limit is reached.
549    pub fn new(source: S, limit: ByteLength, on_limit: L) -> Self {
550        Self {
551            source,
552            limit: limit.0,
553            on_limit,
554        }
555    }
556}
557impl<S, L> AsyncRead for ReadLimited<S, L>
558where
559    S: AsyncRead,
560    L: Fn() -> std::io::Error,
561{
562    fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
563        // SAFETY: we don't move anything.
564        let self_ = unsafe { self.get_unchecked_mut() };
565
566        if self_.limit == 0 {
567            let err = (self_.on_limit)();
568            return Poll::Ready(Err(err));
569        }
570
571        if buf.len() > self_.limit {
572            buf = &mut buf[..self_.limit];
573        }
574
575        // SAFETY: we never move `source`.
576        match unsafe { Pin::new_unchecked(&mut self_.source) }.poll_read(cx, buf) {
577            Poll::Ready(Ok(l)) => {
578                self_.limit = self_.limit.saturating_sub(l);
579                if self_.limit == 0 {
580                    let err = (self_.on_limit)();
581                    Poll::Ready(Err(err))
582                } else {
583                    Poll::Ready(Ok(l))
584                }
585            }
586            r => r,
587        }
588    }
589}
590
591enum ReadState {
592    Running,
593    Eof,
594    Err(CloneableError),
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use crate as task;
601    use zng_unit::TimeUnits;
602
603    #[test]
604    pub fn mc_buf_reader_parallel() {
605        let data = Data::new(60.kilobytes().0);
606
607        let mut expected = vec![0; data.len];
608        let _ = data.clone().blocking_read(&mut expected[..]);
609
610        let mut a = McBufReader::new(data);
611        let mut b = a.clone();
612        let mut c = a.clone();
613
614        let (a, b, c) = async_test(async move {
615            let a = task::run(async move {
616                let mut buf = vec![];
617                a.read_to_end(&mut buf).await.unwrap();
618                buf
619            });
620            let b = task::run(async move {
621                let mut buf: Vec<u8> = vec![];
622                b.read_to_end(&mut buf).await.unwrap();
623                buf
624            });
625            let c = task::run(async move {
626                let mut buf: Vec<u8> = vec![];
627                c.read_to_end(&mut buf).await.unwrap();
628                buf
629            });
630
631            task::all!(a, b, c).await
632        });
633
634        crate::assert_vec_eq!(expected, a);
635        crate::assert_vec_eq!(expected, b);
636        crate::assert_vec_eq!(expected, c);
637    }
638
639    #[test]
640    pub fn mc_buf_reader_single() {
641        let data = Data::new(60.kilobytes().0);
642
643        let mut expected = vec![0; data.len];
644        let _ = data.clone().blocking_read(&mut expected[..]);
645
646        let mut a = McBufReader::new(data);
647
648        let a = async_test(async move {
649            let a = task::run(async move {
650                let mut buf = vec![];
651                a.read_to_end(&mut buf).await.unwrap();
652                buf
653            });
654
655            a.await
656        });
657
658        crate::assert_vec_eq!(expected, a);
659    }
660
661    #[test]
662    pub fn mc_buf_reader_sequential() {
663        let data = Data::new(60.kilobytes().0);
664
665        let mut expected = vec![0; data.len];
666        let _ = data.clone().blocking_read(&mut expected[..]);
667
668        let mut clones = vec![McBufReader::new(data)];
669        for _ in 0..5 {
670            clones.push(clones[0].clone());
671        }
672
673        let r = async_test(async move {
674            let mut r = vec![];
675
676            for mut clone in clones {
677                let mut buf = vec![];
678                clone.read_to_end(&mut buf).await.unwrap();
679                r.push(buf);
680            }
681
682            r
683        });
684
685        for r in r {
686            crate::assert_vec_eq!(expected, r);
687        }
688    }
689
690    #[test]
691    pub fn mc_buf_reader_completed() {
692        let data = Data::new(60.kilobytes().0);
693        let mut buf = Vec::with_capacity(data.len);
694        let mut a = McBufReader::new(data);
695
696        let r = async_test(async move {
697            a.read_to_end(&mut buf).await.unwrap();
698
699            let mut b = a.clone();
700            buf.clear();
701
702            b.read_to_end(&mut buf).await.unwrap();
703            buf.len()
704        });
705
706        assert_eq!(0, r);
707    }
708
709    #[test]
710    pub fn mc_buf_reader_error() {
711        let mut data = Data::new(20.kilobytes().0);
712        data.set_error();
713
714        let mut expected = vec![0; data.len];
715        let _ = data.clone().blocking_read(&mut expected[..]);
716
717        let mut a = McBufReader::new(data);
718        let mut b = a.clone();
719
720        let (a, b) = async_test(async move {
721            let a = task::run(async move {
722                let mut buf = vec![];
723                a.read_to_end(&mut buf).await.unwrap_err()
724            });
725            let b = task::run(async move {
726                let mut buf: Vec<u8> = vec![];
727                b.read_to_end(&mut buf).await.unwrap_err()
728            });
729
730            task::all!(a, b).await
731        });
732
733        assert_eq!(ErrorKind::InvalidData, a.kind());
734        assert_eq!(ErrorKind::InvalidData, b.kind());
735    }
736
737    #[test]
738    pub fn mc_buf_reader_error_completed() {
739        let mut data = Data::new(20.kilobytes().0);
740        data.set_error();
741
742        let mut buf = Vec::with_capacity(data.len);
743        let mut a = McBufReader::new(data);
744
745        let (a, b) = async_test(async move {
746            let a_err = a.read_to_end(&mut buf).await.unwrap_err();
747
748            let mut b = a.clone();
749            buf.clear();
750
751            let b_err = b.read_to_end(&mut buf).await.unwrap_err();
752
753            (a_err, b_err)
754        });
755
756        assert_eq!(ErrorKind::InvalidData, a.kind());
757        assert_eq!(ErrorKind::InvalidData, b.kind());
758    }
759
760    #[test]
761    pub fn mc_buf_reader_parallel_with_delay1() {
762        let mut data = Data::new(60.kilobytes().0);
763        data.enable_pending();
764
765        let mut expected = vec![0; data.len];
766        let _ = data.clone().blocking_read(&mut expected[..]);
767
768        let mut a = McBufReader::new(data);
769        let mut b = a.clone();
770        let mut c = a.clone();
771
772        let (a, b, c) = async_test(async move {
773            let a = task::run(async move {
774                let mut buf = vec![];
775                a.read_to_end(&mut buf).await.unwrap();
776                buf
777            });
778            let b = task::run(async move {
779                let mut buf: Vec<u8> = vec![];
780                b.read_to_end(&mut buf).await.unwrap();
781                buf
782            });
783            let c = task::run(async move {
784                let mut buf: Vec<u8> = vec![];
785                c.read_to_end(&mut buf).await.unwrap();
786                buf
787            });
788
789            task::all!(a, b, c).await
790        });
791
792        crate::assert_vec_eq!(expected, a);
793        crate::assert_vec_eq!(expected, b);
794        crate::assert_vec_eq!(expected, c);
795    }
796
797    #[test]
798    pub fn mc_buf_reader_parallel_with_delay2() {
799        let mut data = Data::new(60.kilobytes().0);
800        data.enable_pending();
801
802        let mut expected = vec![0; data.len];
803        let _ = data.clone().blocking_read(&mut expected[..]);
804
805        let mut a = McBufReader::new(data);
806        let mut b = a.clone();
807        let mut c = a.clone();
808
809        let (a, b, c) = async_test(async move {
810            let a = task::run(async move {
811                let mut buf = vec![];
812                a.read_to_end(&mut buf).await.unwrap();
813                buf
814            });
815            let b = task::run(async move {
816                let mut buf: Vec<u8> = vec![];
817                task::deadline(5.ms()).await;
818                b.read_to_end(&mut buf).await.unwrap();
819                buf
820            });
821            let c = task::run(async move {
822                let mut buf: Vec<u8> = vec![];
823                c.read_to_end(&mut buf).await.unwrap();
824                buf
825            });
826
827            task::all!(a, b, c).await
828        });
829
830        crate::assert_vec_eq!(expected, a);
831        crate::assert_vec_eq!(expected, b);
832        crate::assert_vec_eq!(expected, c);
833    }
834
835    #[derive(Clone)]
836    struct Data {
837        b: u8,
838        len: usize,
839        error: Option<CloneableError>,
840        delay: Duration,
841        pending: bool,
842    }
843    impl Data {
844        pub fn new(len: usize) -> Self {
845            Self {
846                b: 0,
847                len,
848                error: None,
849                delay: 0.ms(),
850                pending: false,
851            }
852        }
853        pub fn blocking_read(&mut self, buf: &mut [u8]) -> Result<usize> {
854            let len = self.len;
855            for b in buf.iter_mut().take(len) {
856                *b = self.b;
857                self.len -= 1;
858                self.b = self.b.wrapping_add(1);
859            }
860
861            if len == 0
862                && let Some(e) = &self.error
863            {
864                return e.err();
865            }
866
867            Ok(buf.len().min(len))
868        }
869        pub fn set_error(&mut self) {
870            self.error = Some(CloneableError::new(&Error::new(ErrorKind::InvalidData, "test error")));
871        }
872
873        pub fn enable_pending(&mut self) {
874            self.delay = 3.ms();
875        }
876    }
877    impl AsyncRead for Data {
878        fn poll_read(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
879            if self.delay > Duration::ZERO {
880                self.pending = !self.pending;
881                if self.pending {
882                    let waker = cx.waker().clone();
883                    let delay = self.delay;
884                    task::spawn(async move {
885                        task::deadline(delay).await;
886                        waker.wake();
887                    });
888                    return Poll::Pending;
889                }
890            }
891
892            let r = self.as_mut().blocking_read(buf);
893            Poll::Ready(r)
894        }
895    }
896
897    #[track_caller]
898    fn async_test<F>(test: F) -> F::Output
899    where
900        F: Future,
901    {
902        task::block_on(task::with_deadline(test, 5.secs())).unwrap()
903    }
904
905    /// Assert vector equality with better error message.
906    #[macro_export]
907    macro_rules! assert_vec_eq {
908        ($a:expr, $b: expr) => {
909            match (&$a, &$b) {
910                (ref a, ref b) => {
911                    let len_not_eq = a.len() != b.len();
912                    let mut data_not_eq = None;
913                    for (i, (a, b)) in a.iter().zip(b.iter()).enumerate() {
914                        if a != b {
915                            data_not_eq = Some(i);
916                            break;
917                        }
918                    }
919
920                    if len_not_eq || data_not_eq.is_some() {
921                        use std::fmt::*;
922
923                        let mut error = format!("`{}` != `{}`", stringify!($a), stringify!($b));
924                        if len_not_eq {
925                            let _ = write!(&mut error, "\n  lengths not equal: {} != {}", a.len(), b.len());
926                        }
927                        if let Some(i) = data_not_eq {
928                            let _ = write!(&mut error, "\n  data not equal at index {}: {} != {:?}", i, a[i], b[i]);
929                        }
930                        panic!("{error}")
931                    }
932                }
933            }
934        };
935    }
936}