zng_task/
io.rs

1//! IO tasks.
2//!
3//! Most of the types in this module are re-exported from [`futures_lite::io`].
4//!
5//! [`futures_lite::io`]: https://docs.rs/futures-lite/latest/futures_lite/io/index.html
6
7use std::{
8    fmt,
9    io::ErrorKind,
10    pin::Pin,
11    sync::Arc,
12    task::{self, Poll},
13    time::Duration,
14};
15
16use crate::{McWaker, Progress};
17
18#[doc(no_inline)]
19pub use futures_lite::io::{
20    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BoxedReader, BoxedWriter,
21    BufReader, BufWriter, Cursor, ReadHalf, WriteHalf, copy, empty, repeat, sink, split,
22};
23use parking_lot::Mutex;
24use std::io::{Error, Result};
25use zng_time::{DInstant, INSTANT};
26use zng_txt::formatx;
27use zng_unit::{ByteLength, ByteUnits};
28use zng_var::impl_from_and_into_var;
29
30/// Measure read/write of an async task.
31///
32/// Metrics are updated after each read/write, if you read/write all bytes in one call
33/// the metrics will only update once.
34pub struct Measure<T> {
35    task: T,
36    metrics: Metrics,
37    start_time: DInstant,
38    last_write: DInstant,
39    last_read: DInstant,
40}
41impl<T> Measure<T> {
42    /// Start measuring a new read/write task.
43    pub fn start(task: T, total_read: impl Into<ByteLength>, total_write: impl Into<ByteLength>) -> Self {
44        Self::resume(task, (0, total_read), (0, total_write))
45    }
46
47    /// Continue measuring a read/write task.
48    pub fn resume(
49        task: T,
50        read_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
51        write_progress: (impl Into<ByteLength>, impl Into<ByteLength>),
52    ) -> Self {
53        let now = INSTANT.now();
54        Measure {
55            task,
56            metrics: Metrics {
57                read_progress: (read_progress.0.into(), read_progress.1.into()),
58                read_speed: 0.bytes(),
59                write_progress: (write_progress.0.into(), write_progress.1.into()),
60                write_speed: 0.bytes(),
61                total_time: Duration::ZERO,
62            },
63            start_time: now,
64            last_write: now,
65            last_read: now,
66        }
67    }
68
69    /// Current metrics.
70    ///
71    /// This value is updated after every read/write.
72    pub fn metrics(&mut self) -> &Metrics {
73        &self.metrics
74    }
75
76    /// Unwrap the inner task and final metrics.
77    pub fn finish(mut self) -> (T, Metrics) {
78        self.metrics.total_time = self.start_time.elapsed();
79        (self.task, self.metrics)
80    }
81}
82
83fn bytes_per_sec(bytes: ByteLength, elapsed: Duration) -> ByteLength {
84    let bytes_per_sec = bytes.0 as u128 / elapsed.as_nanos() / Duration::from_secs(1).as_nanos();
85    ByteLength(bytes_per_sec as usize)
86}
87
88impl<T: AsyncRead + Unpin> AsyncRead for Measure<T> {
89    fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
90        let self_ = self.get_mut();
91        match Pin::new(&mut self_.task).poll_read(cx, buf) {
92            Poll::Ready(Ok(bytes)) => {
93                if bytes > 0 {
94                    let bytes = bytes.bytes();
95                    self_.metrics.read_progress.0 += bytes;
96
97                    let now = INSTANT.now();
98                    let elapsed = now - self_.last_read;
99
100                    self_.last_read = now;
101                    self_.metrics.read_speed = bytes_per_sec(bytes, elapsed);
102
103                    self_.metrics.total_time = now - self_.start_time;
104                }
105                Poll::Ready(Ok(bytes))
106            }
107            p => p,
108        }
109    }
110}
111impl<T: AsyncWrite + Unpin> AsyncWrite for Measure<T> {
112    fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
113        let self_ = self.get_mut();
114        match Pin::new(&mut self_.task).poll_write(cx, buf) {
115            Poll::Ready(Ok(bytes)) => {
116                if bytes > 0 {
117                    let bytes = bytes.bytes();
118                    self_.metrics.write_progress.0 += bytes;
119
120                    let now = INSTANT.now();
121                    let elapsed = now - self_.last_write;
122
123                    self_.last_write = now;
124                    self_.metrics.write_speed = bytes_per_sec(bytes, elapsed);
125
126                    self_.metrics.total_time = now - self_.start_time;
127                }
128                Poll::Ready(Ok(bytes))
129            }
130            p => p,
131        }
132    }
133
134    fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
135        Pin::new(&mut self.get_mut().task).poll_flush(cx)
136    }
137
138    fn poll_close(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<()>> {
139        Pin::new(&mut self.get_mut().task).poll_close(cx)
140    }
141}
142
143/// Information about the state of an async IO task.
144///
145/// Use [`Measure`] to measure a task.
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct Metrics {
148    /// Number of bytes read / estimated total.
149    pub read_progress: (ByteLength, ByteLength),
150
151    /// Average read speed in bytes/second.
152    pub read_speed: ByteLength,
153
154    /// Number of bytes written / estimated total.
155    pub write_progress: (ByteLength, ByteLength),
156
157    /// Average write speed in bytes/second.
158    pub write_speed: ByteLength,
159
160    /// Total time for the entire task. This will continuously increase until
161    /// the task is finished.
162    pub total_time: Duration,
163}
164impl Metrics {
165    /// All zeros.
166    pub fn zero() -> Self {
167        Self {
168            read_progress: (0.bytes(), 0.bytes()),
169            read_speed: 0.bytes(),
170            write_progress: (0.bytes(), 0.bytes()),
171            write_speed: 0.bytes(),
172            total_time: Duration::ZERO,
173        }
174    }
175}
176impl fmt::Display for Metrics {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        let mut w = false;
179        if self.read_progress.1 > 0.bytes() {
180            w = true;
181            if self.read_progress.0 != self.read_progress.1 {
182                write!(f, "↓ {}-{}, {}/s", self.read_progress.0, self.read_progress.1, self.read_speed)?;
183                w = true;
184            } else {
185                write!(f, "↓ {} . {:?}", self.read_progress.0, self.total_time)?;
186            }
187        }
188        if self.write_progress.1 > 0.bytes() {
189            if w {
190                writeln!(f)?;
191            }
192            if self.write_progress.0 != self.write_progress.1 {
193                write!(f, "↑ {} - {}, {}/s", self.write_progress.0, self.write_progress.1, self.write_speed)?;
194            } else {
195                write!(f, "↑ {} . {:?}", self.write_progress.0, self.total_time)?;
196            }
197        }
198
199        Ok(())
200    }
201}
202impl_from_and_into_var! {
203    fn from(metrics: Metrics) -> Progress {
204        let mut status = Progress::indeterminate();
205        if metrics.read_progress.1 > 0.bytes() {
206            status = Progress::from_n_of(metrics.read_progress.0 .0, metrics.read_progress.1 .0);
207        }
208        if metrics.write_progress.1 > 0.bytes() {
209            let w_status = Progress::from_n_of(metrics.write_progress.0 .0, metrics.write_progress.1 .0);
210            if status.is_indeterminate() {
211                status = w_status;
212            } else {
213                status = status.and_fct(w_status.fct());
214            }
215        }
216        status.with_msg(formatx!("{metrics}")).with_meta_mut(|mut m| {
217            m.set(*METRICS_ID, metrics);
218        })
219    }
220}
221
222zng_state_map::static_id! {
223    /// Metrics in a [`Progress::with_meta`] metadata.
224    pub static ref METRICS_ID: zng_state_map::StateId<Metrics>;
225}
226
227/// Extension methods for [`std::io::Error`] to be used with errors returned by [`McBufReader`].
228pub trait McBufErrorExt {
229    /// Returns `true` if this error represents the condition where there are only [`McBufReader::is_lazy`] readers
230    /// left, the buffer is drained and the inner reader is not EOF.
231    ///
232    /// You can recover from this error by turning the reader non-lazy using [`McBufReader::set_lazy`].
233    fn is_only_lazy_left(&self) -> bool;
234}
235impl McBufErrorExt for std::io::Error {
236    fn is_only_lazy_left(&self) -> bool {
237        matches!(self.kind(), ErrorKind::Other) && format!("{self:?}").contains(ONLY_NON_LAZY_ERROR_MSG)
238    }
239}
240const ONLY_NON_LAZY_ERROR_MSG: &str = "no non-lazy readers left to read";
241
242/// Multiple consumer buffered read.
243///
244/// Clone an instance to create a new consumer, already read bytes stay in the buffer until all clones have read it,
245/// clones continue reading from the same offset as the reader they cloned.
246///
247/// A single instance of this reader behaves like a `BufReader`.
248///
249/// # Result
250///
251/// The result is *repeats* ready when `EOF` or an [`Error`] occurs, unfortunately the IO error is not cloneable
252/// so the error is recreated using [`CloneableError`] for subsequent poll attempts.
253///
254/// The inner reader is dropped as soon as it finishes.
255///
256/// # Lazy Clones
257///
258/// You can mark clones as [lazy], lazy clones don't pull from the inner reader, only advance when another clone reads, if
259/// all living clones are lazy they stop reading with an error. You can identify this custom error using the [`McBufErrorExt::is_only_lazy_left`]
260/// extension method.
261///
262/// [lazy]: Self::set_lazy
263pub struct McBufReader<S: AsyncRead> {
264    inner: Arc<Mutex<McBufInner<S>>>,
265    index: usize,
266    lazy: bool,
267}
268struct McBufInner<S: AsyncRead> {
269    source: Option<S>,
270    waker: McWaker,
271    lazy_wakers: Vec<task::Waker>,
272
273    buf: Vec<u8>,
274
275    clones: Vec<usize>,
276    non_lazy_count: usize,
277
278    result: ReadState,
279}
280impl<S: AsyncRead> McBufReader<S> {
281    /// Creates a buffered reader.
282    pub fn new(source: S) -> Self {
283        let mut clones = Vec::with_capacity(2);
284        clones.push(0);
285        McBufReader {
286            inner: Arc::new(Mutex::new(McBufInner {
287                source: Some(source),
288                waker: McWaker::empty(),
289                lazy_wakers: vec![],
290
291                buf: Vec::with_capacity(10.kilobytes().0),
292
293                clones,
294                non_lazy_count: 1,
295
296                result: ReadState::Running,
297            })),
298            index: 0,
299            lazy: false,
300        }
301    }
302
303    /// Returns `true` if this reader does not pull from the inner reader, only advancing when a non-lazy reader advances.
304    ///
305    /// The initial reader is not lazy, only clones of lazy readers are lazy by default.
306    pub fn is_lazy(&self) -> bool {
307        self.lazy
308    }
309
310    /// Sets [`is_lazy`].
311    ///
312    /// [`is_lazy`]: Self::is_lazy
313    pub fn set_lazy(&mut self, lazy: bool) {
314        if self.lazy != lazy {
315            if lazy {
316                self.inner.lock().non_lazy_count -= 1;
317            } else {
318                self.inner.lock().non_lazy_count += 1;
319            }
320            self.lazy = lazy;
321        }
322    }
323}
324impl<S: AsyncRead> Clone for McBufReader<S> {
325    fn clone(&self) -> Self {
326        let mut inner = self.inner.lock();
327
328        let offset = inner.clones[self.index];
329        let index = inner.clones.len();
330        inner.clones.push(offset);
331
332        if !self.lazy {
333            inner.non_lazy_count += 1;
334        }
335
336        Self {
337            inner: self.inner.clone(),
338            index,
339            lazy: self.lazy,
340        }
341    }
342}
343impl<S: AsyncRead> Drop for McBufReader<S> {
344    fn drop(&mut self) {
345        let mut inner = self.inner.lock();
346        inner.clones[self.index] = usize::MAX;
347        if !self.lazy {
348            inner.non_lazy_count -= 1;
349            if inner.non_lazy_count == 0 {
350                // notify lazy so they get the error.
351                for waker in inner.lazy_wakers.drain(..) {
352                    waker.wake();
353                }
354            }
355        }
356    }
357}
358impl<S: AsyncRead> AsyncRead for McBufReader<S> {
359    fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
360        let self_ = self.as_ref();
361        let mut inner = self_.inner.lock();
362        let inner = &mut *inner;
363
364        // ready data for this clone.
365        let mut i = inner.clones[self_.index];
366        let mut ready;
367
368        match &inner.result {
369            ReadState::Running => {
370                // source has not finished yet.
371
372                ready = &inner.buf[i..];
373
374                if ready.is_empty() {
375                    if self.lazy {
376                        if inner.non_lazy_count == 0 {
377                            // user can make this reader non-lazy and try again.
378                            return Poll::Ready(Err(Error::new(ErrorKind::Other, ONLY_NON_LAZY_ERROR_MSG)));
379                        } else {
380                            // register waker for after non-lazy poll.
381                            inner.lazy_wakers.push(cx.waker().clone());
382
383                            // wait non-lazy to pull.
384                            return Poll::Pending;
385                        }
386                    }
387
388                    // time to poll source.
389
390                    ready = &[];
391
392                    let waker = match inner.waker.push(cx.waker().clone()) {
393                        Some(w) => w,
394                        None => {
395                            // already polling from another clone.
396                            return Poll::Pending;
397                        }
398                    };
399
400                    let min_i = inner.clones.iter().copied().min().unwrap();
401                    if min_i > 0 {
402                        // reuse front.
403                        inner.buf.copy_within(min_i.., 0);
404                        inner.buf.truncate(inner.buf.len() - min_i);
405
406                        i -= min_i;
407                        for i in &mut inner.clones {
408                            *i -= min_i;
409                        }
410                    }
411
412                    let new_start = inner.buf.len();
413
414                    inner.buf.resize(inner.buf.len() + buf.len().max(10.kilobytes().0), 0);
415
416                    let mut inner_cx = task::Context::from_waker(&waker);
417
418                    // SAFETY: we don't move `source`.
419                    let source = unsafe { Pin::new_unchecked(inner.source.as_mut().unwrap()) };
420                    let result = source.poll_read(&mut inner_cx, &mut inner.buf[new_start..]);
421
422                    match result {
423                        Poll::Ready(result) => {
424                            // notify lazy readers.
425                            for waker in inner.lazy_wakers.drain(..) {
426                                waker.wake();
427                            }
428
429                            match result {
430                                Ok(0) => {
431                                    inner.waker.cancel();
432
433                                    // EOF
434                                    inner.buf.truncate(new_start);
435                                    inner.result = ReadState::Eof;
436                                    inner.source = None;
437
438                                    // continue 'copy ready
439                                }
440                                Ok(read) => {
441                                    inner.waker.cancel();
442
443                                    // Read > 0
444                                    inner.buf.truncate(new_start + read);
445                                    ready = &inner.buf[i..];
446
447                                    // continue 'copy ready
448                                }
449                                Err(e) => {
450                                    inner.waker.cancel();
451
452                                    // Error
453                                    inner.result = ReadState::Err(CloneableError::new(&e));
454                                    inner.buf = vec![];
455                                    inner.source = None;
456
457                                    return Poll::Ready(Err(e));
458                                }
459                            }
460                        }
461
462                        Poll::Pending => {
463                            inner.buf.truncate(new_start);
464                            return Poll::Pending;
465                        }
466                    }
467                }
468            }
469            ReadState::Eof => {
470                ready = &inner.buf[i..];
471
472                // continue 'copy ready
473            }
474            ReadState::Err(e) => return Poll::Ready(e.err()),
475        }
476
477        // 'copy ready
478
479        let max_ready = buf.len().min(ready.len());
480        buf[..max_ready].copy_from_slice(&ready[..max_ready]);
481
482        i += max_ready;
483        inner.clones[self_.index] = i;
484
485        Poll::Ready(Ok(max_ready))
486    }
487}
488
489/// Represents the cloneable parts of an [`Error`].
490///
491/// Unfortunately [`Error`] does not implement clone, this is needed to implemented
492/// IO futures that repeat the ready result after subsequent polls. This type partially
493/// works around the issue by copying enough information to recreate an error that is still useful.
494///
495/// The OS error code, [`ErrorKind`] and display message are preserved. Note that this not an error type,
496/// it must be converted to [`Error`] using `into` or [`err`].
497///
498/// [`err`]: Self::err
499#[derive(Clone)]
500pub struct CloneableError {
501    info: ErrorInfo,
502}
503#[derive(Clone)]
504enum ErrorInfo {
505    OsError(i32),
506    Other(ErrorKind, String),
507}
508impl CloneableError {
509    /// Copy the cloneable information from the [`Error`].
510    pub fn new(e: &Error) -> Self {
511        let info = if let Some(code) = e.raw_os_error() {
512            ErrorInfo::OsError(code)
513        } else {
514            ErrorInfo::Other(e.kind(), format!("{e}"))
515        };
516
517        Self { info }
518    }
519
520    /// Returns an `Err(Error)` generated from the cloneable information.
521    pub fn err<T>(&self) -> Result<T> {
522        Err(self.clone().into())
523    }
524}
525impl From<CloneableError> for Error {
526    fn from(e: CloneableError) -> Self {
527        match e.info {
528            ErrorInfo::OsError(code) => Error::from_raw_os_error(code),
529            ErrorInfo::Other(kind, msg) => Error::new(kind, msg),
530        }
531    }
532}
533
534/// Represents a future that generates an error if an `AsyncRead` exceeds a limit.
535pub struct ReadLimited<S, L> {
536    source: S,
537    limit: usize,
538    on_limit: L,
539}
540impl<S, L> ReadLimited<S, L>
541where
542    S: AsyncRead,
543    L: Fn() -> std::io::Error,
544{
545    /// Construct a limited reader.
546    ///
547    /// The `on_limit` closure is called if the limit is reached.
548    pub fn new(source: S, limit: ByteLength, on_limit: L) -> Self {
549        Self {
550            source,
551            limit: limit.0,
552            on_limit,
553        }
554    }
555}
556impl<S, L> AsyncRead for ReadLimited<S, L>
557where
558    S: AsyncRead,
559    L: Fn() -> std::io::Error,
560{
561    fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> {
562        // SAFETY: we don't move anything.
563        let self_ = unsafe { self.get_unchecked_mut() };
564
565        if self_.limit == 0 {
566            let err = (self_.on_limit)();
567            return Poll::Ready(Err(err));
568        }
569
570        if buf.len() > self_.limit {
571            buf = &mut buf[..self_.limit];
572        }
573
574        // SAFETY: we never move `source`.
575        match unsafe { Pin::new_unchecked(&mut self_.source) }.poll_read(cx, buf) {
576            Poll::Ready(Ok(l)) => {
577                self_.limit = self_.limit.saturating_sub(l);
578                if self_.limit == 0 {
579                    let err = (self_.on_limit)();
580                    Poll::Ready(Err(err))
581                } else {
582                    Poll::Ready(Ok(l))
583                }
584            }
585            r => r,
586        }
587    }
588}
589
590enum ReadState {
591    Running,
592    Eof,
593    Err(CloneableError),
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599    use crate as task;
600    use zng_unit::TimeUnits;
601
602    #[test]
603    pub fn mc_buf_reader_parallel() {
604        let data = Data::new(60.kilobytes().0);
605
606        let mut expected = vec![0; data.len];
607        let _ = data.clone().blocking_read(&mut expected[..]);
608
609        let mut a = McBufReader::new(data);
610        let mut b = a.clone();
611        let mut c = a.clone();
612
613        let (a, b, c) = async_test(async move {
614            let a = task::run(async move {
615                let mut buf = vec![];
616                a.read_to_end(&mut buf).await.unwrap();
617                buf
618            });
619            let b = task::run(async move {
620                let mut buf: Vec<u8> = vec![];
621                b.read_to_end(&mut buf).await.unwrap();
622                buf
623            });
624            let c = task::run(async move {
625                let mut buf: Vec<u8> = vec![];
626                c.read_to_end(&mut buf).await.unwrap();
627                buf
628            });
629
630            task::all!(a, b, c).await
631        });
632
633        crate::assert_vec_eq!(expected, a);
634        crate::assert_vec_eq!(expected, b);
635        crate::assert_vec_eq!(expected, c);
636    }
637
638    #[test]
639    pub fn mc_buf_reader_single() {
640        let data = Data::new(60.kilobytes().0);
641
642        let mut expected = vec![0; data.len];
643        let _ = data.clone().blocking_read(&mut expected[..]);
644
645        let mut a = McBufReader::new(data);
646
647        let a = async_test(async move {
648            let a = task::run(async move {
649                let mut buf = vec![];
650                a.read_to_end(&mut buf).await.unwrap();
651                buf
652            });
653
654            a.await
655        });
656
657        crate::assert_vec_eq!(expected, a);
658    }
659
660    #[test]
661    pub fn mc_buf_reader_sequential() {
662        let data = Data::new(60.kilobytes().0);
663
664        let mut expected = vec![0; data.len];
665        let _ = data.clone().blocking_read(&mut expected[..]);
666
667        let mut clones = vec![McBufReader::new(data)];
668        for _ in 0..5 {
669            clones.push(clones[0].clone());
670        }
671
672        let r = async_test(async move {
673            let mut r = vec![];
674
675            for mut clone in clones {
676                let mut buf = vec![];
677                clone.read_to_end(&mut buf).await.unwrap();
678                r.push(buf);
679            }
680
681            r
682        });
683
684        for r in r {
685            crate::assert_vec_eq!(expected, r);
686        }
687    }
688
689    #[test]
690    pub fn mc_buf_reader_completed() {
691        let data = Data::new(60.kilobytes().0);
692        let mut buf = Vec::with_capacity(data.len);
693        let mut a = McBufReader::new(data);
694
695        let r = async_test(async move {
696            a.read_to_end(&mut buf).await.unwrap();
697
698            let mut b = a.clone();
699            buf.clear();
700
701            b.read_to_end(&mut buf).await.unwrap();
702            buf.len()
703        });
704
705        assert_eq!(0, r);
706    }
707
708    #[test]
709    pub fn mc_buf_reader_error() {
710        let mut data = Data::new(20.kilobytes().0);
711        data.set_error();
712
713        let mut expected = vec![0; data.len];
714        let _ = data.clone().blocking_read(&mut expected[..]);
715
716        let mut a = McBufReader::new(data);
717        let mut b = a.clone();
718
719        let (a, b) = async_test(async move {
720            let a = task::run(async move {
721                let mut buf = vec![];
722                a.read_to_end(&mut buf).await.unwrap_err()
723            });
724            let b = task::run(async move {
725                let mut buf: Vec<u8> = vec![];
726                b.read_to_end(&mut buf).await.unwrap_err()
727            });
728
729            task::all!(a, b).await
730        });
731
732        assert_eq!(ErrorKind::InvalidData, a.kind());
733        assert_eq!(ErrorKind::InvalidData, b.kind());
734    }
735
736    #[test]
737    pub fn mc_buf_reader_error_completed() {
738        let mut data = Data::new(20.kilobytes().0);
739        data.set_error();
740
741        let mut buf = Vec::with_capacity(data.len);
742        let mut a = McBufReader::new(data);
743
744        let (a, b) = async_test(async move {
745            let a_err = a.read_to_end(&mut buf).await.unwrap_err();
746
747            let mut b = a.clone();
748            buf.clear();
749
750            let b_err = b.read_to_end(&mut buf).await.unwrap_err();
751
752            (a_err, b_err)
753        });
754
755        assert_eq!(ErrorKind::InvalidData, a.kind());
756        assert_eq!(ErrorKind::InvalidData, b.kind());
757    }
758
759    #[test]
760    pub fn mc_buf_reader_parallel_with_delay1() {
761        let mut data = Data::new(60.kilobytes().0);
762        data.enable_pending();
763
764        let mut expected = vec![0; data.len];
765        let _ = data.clone().blocking_read(&mut expected[..]);
766
767        let mut a = McBufReader::new(data);
768        let mut b = a.clone();
769        let mut c = a.clone();
770
771        let (a, b, c) = async_test(async move {
772            let a = task::run(async move {
773                let mut buf = vec![];
774                a.read_to_end(&mut buf).await.unwrap();
775                buf
776            });
777            let b = task::run(async move {
778                let mut buf: Vec<u8> = vec![];
779                b.read_to_end(&mut buf).await.unwrap();
780                buf
781            });
782            let c = task::run(async move {
783                let mut buf: Vec<u8> = vec![];
784                c.read_to_end(&mut buf).await.unwrap();
785                buf
786            });
787
788            task::all!(a, b, c).await
789        });
790
791        crate::assert_vec_eq!(expected, a);
792        crate::assert_vec_eq!(expected, b);
793        crate::assert_vec_eq!(expected, c);
794    }
795
796    #[test]
797    pub fn mc_buf_reader_parallel_with_delay2() {
798        let mut data = Data::new(60.kilobytes().0);
799        data.enable_pending();
800
801        let mut expected = vec![0; data.len];
802        let _ = data.clone().blocking_read(&mut expected[..]);
803
804        let mut a = McBufReader::new(data);
805        let mut b = a.clone();
806        let mut c = a.clone();
807
808        let (a, b, c) = async_test(async move {
809            let a = task::run(async move {
810                let mut buf = vec![];
811                a.read_to_end(&mut buf).await.unwrap();
812                buf
813            });
814            let b = task::run(async move {
815                let mut buf: Vec<u8> = vec![];
816                task::deadline(5.ms()).await;
817                b.read_to_end(&mut buf).await.unwrap();
818                buf
819            });
820            let c = task::run(async move {
821                let mut buf: Vec<u8> = vec![];
822                c.read_to_end(&mut buf).await.unwrap();
823                buf
824            });
825
826            task::all!(a, b, c).await
827        });
828
829        crate::assert_vec_eq!(expected, a);
830        crate::assert_vec_eq!(expected, b);
831        crate::assert_vec_eq!(expected, c);
832    }
833
834    #[derive(Clone)]
835    struct Data {
836        b: u8,
837        len: usize,
838        error: Option<CloneableError>,
839        delay: Duration,
840        pending: bool,
841    }
842    impl Data {
843        pub fn new(len: usize) -> Self {
844            Self {
845                b: 0,
846                len,
847                error: None,
848                delay: 0.ms(),
849                pending: false,
850            }
851        }
852        pub fn blocking_read(&mut self, buf: &mut [u8]) -> Result<usize> {
853            let len = self.len;
854            for b in buf.iter_mut().take(len) {
855                *b = self.b;
856                self.len -= 1;
857                self.b = self.b.wrapping_add(1);
858            }
859
860            if len == 0 {
861                if let Some(e) = &self.error {
862                    return e.err();
863                }
864            }
865
866            Ok(buf.len().min(len))
867        }
868        pub fn set_error(&mut self) {
869            self.error = Some(CloneableError::new(&Error::new(ErrorKind::InvalidData, "test error")));
870        }
871
872        pub fn enable_pending(&mut self) {
873            self.delay = 3.ms();
874        }
875    }
876    impl AsyncRead for Data {
877        fn poll_read(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
878            if self.delay > Duration::ZERO {
879                self.pending = !self.pending;
880                if self.pending {
881                    let waker = cx.waker().clone();
882                    let delay = self.delay;
883                    task::spawn(async move {
884                        task::deadline(delay).await;
885                        waker.wake();
886                    });
887                    return Poll::Pending;
888                }
889            }
890
891            let r = self.as_mut().blocking_read(buf);
892            Poll::Ready(r)
893        }
894    }
895
896    #[track_caller]
897    fn async_test<F>(test: F) -> F::Output
898    where
899        F: Future,
900    {
901        task::block_on(task::with_deadline(test, 5.secs())).unwrap()
902    }
903
904    /// Assert vector equality with better error message.
905    #[macro_export]
906    macro_rules! assert_vec_eq {
907        ($a:expr, $b: expr) => {
908            match (&$a, &$b) {
909                (ref a, ref b) => {
910                    let len_not_eq = a.len() != b.len();
911                    let mut data_not_eq = None;
912                    for (i, (a, b)) in a.iter().zip(b.iter()).enumerate() {
913                        if a != b {
914                            data_not_eq = Some(i);
915                            break;
916                        }
917                    }
918
919                    if len_not_eq || data_not_eq.is_some() {
920                        use std::fmt::*;
921
922                        let mut error = format!("`{}` != `{}`", stringify!($a), stringify!($b));
923                        if len_not_eq {
924                            let _ = write!(&mut error, "\n  lengths not equal: {} != {}", a.len(), b.len());
925                        }
926                        if let Some(i) = data_not_eq {
927                            let _ = write!(&mut error, "\n  data not equal at index {}: {} != {:?}", i, a[i], b[i]);
928                        }
929                        panic!("{error}")
930                    }
931                }
932            }
933        };
934    }
935}