zng_task/channel/
ipc_bytes.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    cell::Cell,
5    fmt, fs,
6    io::{self, Read, Write},
7    iter::FusedIterator,
8    marker::PhantomData,
9    mem::MaybeUninit,
10    ops,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, Weak},
14};
15
16use futures_lite::{AsyncReadExt, AsyncWriteExt as _};
17#[cfg(ipc)]
18use ipc_channel::ipc::IpcSharedMemory;
19use parking_lot::Mutex;
20use serde::{Deserialize, Serialize, de::VariantAccess};
21use zng_app_context::RunOnDrop;
22
23/// Immutable bytes vector that can be can be shared fast over IPC.
24///
25/// # Memory Storage
26///
27/// All storage backends are held by a [`Arc`] pointer, so cloning in process is always very cheap.
28///
29/// The `from_*` constructor functions use different storage depending on byte length. Bytes <= 64KB are allocated in the heap
30/// and are copied when shared with another process. Bytes <= 128MB are allocated in an anonymous memory map, only the system handle
31/// is copied when shared with another process. Bytes > 128MB are allocated in a temporary file with restricted access and memory mapped
32/// for read, only the file path and some metadata are copied when shared with another process.
33///
34/// Constructor functions for creating memory maps directly are also provided.
35///
36/// Note that in builds without the `"ipc"` crate feature only heap backend is available, in that case all data lengths are stored in the heap.
37///
38/// # Serialization
39///
40/// When serialized inside [`with_ipc_serialization`] the memory map bytes are not copied, only the system handle and metadata is serialized.
41/// When serialized in other contexts all bytes are copied.
42///
43/// When deserializing memory map handles are reconnected and if deserializing bytes selects the best storage based on data length.
44///
45/// [`IpcSender`]: super::IpcSender
46#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(Arc<IpcBytesData>);
49enum IpcBytesData {
50    Heap(Vec<u8>),
51    #[cfg(ipc)]
52    AnonMemMap(IpcSharedMemory),
53    #[cfg(ipc)]
54    MemMap(IpcMemMap),
55}
56#[cfg(ipc)]
57struct IpcMemMap {
58    name: PathBuf,
59    range: ops::Range<usize>,
60    is_custom: bool,
61    map: Option<memmap2::Mmap>,
62    read_handle: Option<fs::File>,
63}
64impl fmt::Debug for IpcBytes {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        write!(f, "IpcBytes(<{} bytes>)", self.len())
67    }
68}
69impl ops::Deref for IpcBytes {
70    type Target = [u8];
71
72    fn deref(&self) -> &Self::Target {
73        match &*self.0 {
74            IpcBytesData::Heap(i) => i,
75            #[cfg(ipc)]
76            IpcBytesData::AnonMemMap(m) => m,
77            #[cfg(ipc)]
78            IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
79        }
80    }
81}
82
83impl IpcBytes {
84    /// New empty.
85    pub fn empty() -> Self {
86        IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
87    }
88}
89/// Async constructors.
90impl IpcBytes {
91    /// Start a memory efficient async writer for creating a `IpcBytes` with unknown length.
92    pub async fn new_writer() -> IpcBytesWriter {
93        IpcBytesWriter {
94            inner: blocking::Unblock::new(Self::new_writer_blocking()),
95        }
96    }
97
98    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
99    pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
100        IpcBytesMut::new(len).await
101    }
102
103    /// Copy or move data from vector.
104    pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
105        blocking::unblock(move || Self::from_vec_blocking(data)).await
106    }
107
108    /// Copy data from the iterator.
109    ///
110    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
111    /// will collect to an [`IpcBytesWriter`] that can reallocate multiple times as the buffer grows.
112    ///    
113    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
114    /// is used, if it continues after the given maximum it is clipped.
115    ///
116    /// [`size_hint`]: Iterator::size_hint
117    pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
118        #[cfg(ipc)]
119        {
120            let (min, max) = iter.size_hint();
121            if let Some(max) = max {
122                if max <= Self::INLINE_MAX {
123                    return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
124                } else if max == min {
125                    let mut r = IpcBytes::new_mut(max).await?;
126                    let mut actual_len = 0;
127                    for (i, b) in r.iter_mut().zip(iter) {
128                        *i = b;
129                        actual_len += 1;
130                    }
131                    r.truncate(actual_len);
132                    return r.finish().await;
133                }
134            }
135
136            let mut writer = Self::new_writer().await;
137            for b in iter {
138                writer.write_all(&[b]).await?;
139            }
140            writer.finish().await
141        }
142
143        #[cfg(not(ipc))]
144        {
145            Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
146        }
147    }
148
149    /// Read `data` into shared memory.
150    pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
151        #[cfg(ipc)]
152        {
153            Self::from_read_ipc(data).await
154        }
155        #[cfg(not(ipc))]
156        {
157            let mut data = data;
158            let mut buf = vec![];
159            data.read_to_end(&mut buf).await;
160            Self::from_vec(buf).await
161        }
162    }
163    #[cfg(ipc)]
164    async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
165        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
166        let mut len = 0;
167
168        // INLINE_MAX read
169        loop {
170            match data.read(&mut buf[len..]).await {
171                Ok(l) => {
172                    if l == 0 {
173                        // is <= INLINE_MAX
174                        buf.truncate(len);
175                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
176                    } else {
177                        len += l;
178                        if len == Self::INLINE_MAX + 1 {
179                            // goto UNNAMED_MAX read
180                            break;
181                        }
182                    }
183                }
184                Err(e) => match e.kind() {
185                    io::ErrorKind::WouldBlock => continue,
186                    _ => return Err(e),
187                },
188            }
189        }
190
191        // UNNAMED_MAX read
192        buf.resize(Self::UNNAMED_MAX + 1, 0);
193        loop {
194            match data.read(&mut buf[len..]).await {
195                Ok(l) => {
196                    if l == 0 {
197                        // is <= UNNAMED_MAX
198                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
199                    } else {
200                        len += l;
201                        if len == Self::UNNAMED_MAX + 1 {
202                            // goto named file loop
203                            break;
204                        }
205                    }
206                }
207                Err(e) => match e.kind() {
208                    io::ErrorKind::WouldBlock => continue,
209                    _ => return Err(e),
210                },
211            }
212        }
213
214        // named file copy
215        Self::new_memmap(async |m| {
216            use futures_lite::AsyncWriteExt as _;
217
218            m.write_all(&buf).await?;
219            crate::io::copy(data, m).await?;
220            Ok(())
221        })
222        .await
223    }
224
225    /// Read `path` into shared memory.
226    pub async fn from_path(path: PathBuf) -> io::Result<Self> {
227        let file = crate::fs::File::open(path).await?;
228        Self::from_file(file).await
229    }
230    /// Read `file` into shared memory.
231    pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
232        #[cfg(ipc)]
233        {
234            let len = file.metadata().await?.len();
235            if len <= Self::UNNAMED_MAX as u64 {
236                let mut buf = vec![0u8; len as usize];
237                file.read_exact(&mut buf).await?;
238                Self::from_vec_blocking(buf)
239            } else {
240                Self::new_memmap(async move |m| {
241                    crate::io::copy(&mut file, m).await?;
242                    Ok(())
243                })
244                .await
245            }
246        }
247        #[cfg(not(ipc))]
248        {
249            let mut buf = vec![];
250            file.read_to_end(&mut buf).await?;
251            Self::from_vec_blocking(buf)
252        }
253    }
254
255    /// Create a memory mapped file.
256    ///
257    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
258    /// always selects the slowest options, a file backed memory map.
259    #[cfg(ipc)]
260    pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
261        let (name, file) = blocking::unblock(Self::create_memmap).await?;
262        let mut file = crate::fs::File::from(file);
263        write(&mut file).await?;
264
265        let mut permissions = file.metadata().await?.permissions();
266        permissions.set_readonly(true);
267        #[cfg(unix)]
268        {
269            use std::os::unix::fs::PermissionsExt;
270            permissions.set_mode(0o400);
271        }
272        file.set_permissions(permissions).await?;
273
274        blocking::unblock(move || {
275            drop(file);
276            let map = IpcMemMap::read(name, None)?;
277            Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
278        })
279        .await
280    }
281
282    /// Memory map an existing file.
283    ///
284    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
285    ///
286    /// # Safety
287    ///
288    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
289    ///
290    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
291    /// so that the file data is as read-only as the static data in the current executable file.
292    ///
293    /// [`new_memmap`]: Self::new_memmap
294    #[cfg(ipc)]
295    pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
296        blocking::unblock(move || {
297            // SAFETY: up to the caller
298            unsafe { Self::open_memmap_blocking(file, range) }
299        })
300        .await
301    }
302
303    /// Gets if both point to the same memory.
304    pub fn ptr_eq(&self, other: &Self) -> bool {
305        let a = &self[..];
306        let b = &other[..];
307        (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
308    }
309
310    #[cfg(ipc)]
311    const INLINE_MAX: usize = 64 * 1024; // 64KB
312    #[cfg(ipc)]
313    const UNNAMED_MAX: usize = 128 * 1024 * 1024; // 128MB
314}
315
316/// Blocking constructors.
317impl IpcBytes {
318    /// Start a memory efficient blocking writer for creating a `IpcBytes` with unknown length.
319    pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
320        IpcBytesWriterBlocking {
321            #[cfg(ipc)]
322            heap_buf: vec![],
323            #[cfg(ipc)]
324            memmap: None,
325
326            #[cfg(not(ipc))]
327            heap_buf: std::io::Cursor::new(vec![]),
328        }
329    }
330
331    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
332    pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
333        IpcBytesMut::new_blocking(len)
334    }
335
336    /// Copy data from slice.
337    pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
338        #[cfg(ipc)]
339        {
340            if data.len() <= Self::INLINE_MAX {
341                Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
342            } else if data.len() <= Self::UNNAMED_MAX {
343                Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
344            } else {
345                Self::new_memmap_blocking(|m| m.write_all(data))
346            }
347        }
348        #[cfg(not(ipc))]
349        {
350            Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
351        }
352    }
353
354    /// Copy or move data from vector.
355    pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
356        #[cfg(ipc)]
357        {
358            if data.len() <= Self::INLINE_MAX {
359                Ok(Self(Arc::new(IpcBytesData::Heap(data))))
360            } else {
361                Self::from_slice_blocking(&data)
362            }
363        }
364        #[cfg(not(ipc))]
365        {
366            Ok(Self(Arc::new(IpcBytesData::Heap(data))))
367        }
368    }
369
370    /// Copy data from the iterator.
371    ///
372    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
373    /// will collect to an [`IpcBytesWriterBlocking`] that can reallocate multiple times as the buffer grows.
374    ///
375    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
376    /// is used, if it continues after the given maximum it is clipped.
377    ///
378    /// [`size_hint`]: Iterator::size_hint
379    pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
380        #[cfg(ipc)]
381        {
382            let (min, max) = iter.size_hint();
383            if let Some(max) = max {
384                if max <= Self::INLINE_MAX {
385                    return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
386                } else if max == min {
387                    let mut r = IpcBytes::new_mut_blocking(max)?;
388                    let mut actual_len = 0;
389                    for (i, b) in r.iter_mut().zip(iter) {
390                        *i = b;
391                        actual_len += 1;
392                    }
393                    r.truncate(actual_len);
394                    return r.finish_blocking();
395                }
396            }
397
398            let mut writer = Self::new_writer_blocking();
399            for b in iter {
400                writer.write_all(&[b])?;
401            }
402            writer.finish()
403        }
404        #[cfg(not(ipc))]
405        {
406            Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
407        }
408    }
409
410    /// Read `data` into shared memory.
411    pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
412        #[cfg(ipc)]
413        {
414            Self::from_read_blocking_ipc(data)
415        }
416        #[cfg(not(ipc))]
417        {
418            let mut buf = vec![];
419            data.read_to_end(&mut buf)?;
420            Self::from_vec_blocking(buf)
421        }
422    }
423    #[cfg(ipc)]
424    fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
425        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
426        let mut len = 0;
427
428        // INLINE_MAX read
429        loop {
430            match data.read(&mut buf[len..]) {
431                Ok(l) => {
432                    if l == 0 {
433                        // is <= INLINE_MAX
434                        buf.truncate(len);
435                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
436                    } else {
437                        len += l;
438                        if len == Self::INLINE_MAX + 1 {
439                            // goto UNNAMED_MAX read
440                            break;
441                        }
442                    }
443                }
444                Err(e) => match e.kind() {
445                    io::ErrorKind::WouldBlock => continue,
446                    _ => return Err(e),
447                },
448            }
449        }
450
451        // UNNAMED_MAX read
452        buf.resize(Self::UNNAMED_MAX + 1, 0);
453        loop {
454            match data.read(&mut buf[len..]) {
455                Ok(l) => {
456                    if l == 0 {
457                        // is <= UNNAMED_MAX
458                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
459                    } else {
460                        len += l;
461                        if len == Self::UNNAMED_MAX + 1 {
462                            // goto named file loop
463                            break;
464                        }
465                    }
466                }
467                Err(e) => match e.kind() {
468                    io::ErrorKind::WouldBlock => continue,
469                    _ => return Err(e),
470                },
471            }
472        }
473
474        // named file copy
475        Self::new_memmap_blocking(|m| {
476            m.write_all(&buf)?;
477            io::copy(data, m)?;
478            Ok(())
479        })
480    }
481
482    /// Read `path` into shared memory.
483    pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
484        let file = fs::File::open(path)?;
485        Self::from_file_blocking(file)
486    }
487    /// Read `file` into shared memory.
488    pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
489        #[cfg(ipc)]
490        {
491            let len = file.metadata()?.len();
492            if len <= Self::UNNAMED_MAX as u64 {
493                let mut buf = vec![0u8; len as usize];
494                file.read_exact(&mut buf)?;
495                Self::from_vec_blocking(buf)
496            } else {
497                Self::new_memmap_blocking(|m| {
498                    io::copy(&mut file, m)?;
499                    Ok(())
500                })
501            }
502        }
503        #[cfg(not(ipc))]
504        {
505            let mut buf = vec![];
506            file.read_to_end(&mut buf)?;
507            Self::from_vec_blocking(buf)
508        }
509    }
510
511    /// Create a memory mapped file.
512    ///
513    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
514    /// always selects the slowest options, a file backed memory map.
515    #[cfg(ipc)]
516    pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
517        let (name, mut file) = Self::create_memmap()?;
518        write(&mut file)?;
519        let mut permissions = file.metadata()?.permissions();
520        permissions.set_readonly(true);
521        #[cfg(unix)]
522        {
523            use std::os::unix::fs::PermissionsExt;
524            permissions.set_mode(0o400);
525        }
526        file.set_permissions(permissions)?;
527
528        drop(file);
529        let map = IpcMemMap::read(name, None)?;
530        Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
531    }
532    #[cfg(ipc)]
533    fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
534        static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
535        let mut count = MEMMAP_DIR.lock();
536
537        if *count == 0 {
538            zng_env::on_process_exit(|_| {
539                IpcBytes::cleanup_memmap_storage();
540            });
541        }
542
543        let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
544        fs::create_dir_all(&dir)?;
545        let mut name = dir.join(count.to_string());
546        if *count < usize::MAX {
547            *count += 1;
548        } else {
549            // very cold path, in practice the running process will die long before this
550            for i in 0..usize::MAX {
551                name = dir.join(i.to_string());
552                if !name.exists() {
553                    break;
554                }
555            }
556            if name.exists() {
557                return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
558            }
559        };
560
561        // read because some callers create a MmapMut
562        let file = fs::OpenOptions::new()
563            .create(true)
564            .read(true)
565            .write(true)
566            .truncate(true)
567            .open(&name)?;
568        Ok((name, file))
569    }
570    #[cfg(ipc)]
571    fn cleanup_memmap_storage() {
572        if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
573            let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
574            for entry in entries {
575                if entry.is_dir() {
576                    fs::remove_dir_all(entry).ok();
577                }
578            }
579        }
580    }
581
582    /// Memory map an existing file.
583    ///
584    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
585    ///
586    /// # Safety
587    ///
588    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
589    ///
590    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
591    /// so that the file data is as read-only as the static data in the current executable file.
592    ///
593    /// [`new_memmap`]: Self::new_memmap
594    #[cfg(ipc)]
595    pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
596        let read_handle = fs::File::open(&file)?;
597        read_handle.lock_shared()?;
598        let len = read_handle.metadata()?.len();
599        if let Some(range) = &range
600            && len < range.end as u64
601        {
602            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
603        }
604        // SAFETY: up to the caller.
605        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
606
607        let range = range.unwrap_or_else(|| 0..map.len());
608
609        Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
610            name: file,
611            range,
612            read_handle: Some(read_handle),
613            is_custom: true,
614            map: Some(map),
615        }))))
616    }
617}
618
619impl AsRef<[u8]> for IpcBytes {
620    fn as_ref(&self) -> &[u8] {
621        &self[..]
622    }
623}
624impl Default for IpcBytes {
625    fn default() -> Self {
626        Self::empty()
627    }
628}
629impl PartialEq for IpcBytes {
630    fn eq(&self, other: &Self) -> bool {
631        self.ptr_eq(other) || self[..] == other[..]
632    }
633}
634impl Eq for IpcBytes {}
635#[cfg(ipc)]
636impl IpcMemMap {
637    fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
638        let read_handle = fs::File::open(&name)?;
639        read_handle.lock_shared()?;
640        // SAFETY: File is marked read-only and a read lock is held for it.
641        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
642
643        let range = range.unwrap_or_else(|| 0..map.len());
644
645        Ok(IpcMemMap {
646            name,
647            range,
648            is_custom: false,
649            read_handle: Some(read_handle),
650            map: Some(map),
651        })
652    }
653}
654#[cfg(ipc)]
655impl Serialize for IpcMemMap {
656    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
657    where
658        S: serde::Serializer,
659    {
660        (&self.name, self.range.clone()).serialize(serializer)
661    }
662}
663#[cfg(ipc)]
664impl<'de> Deserialize<'de> for IpcMemMap {
665    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
666    where
667        D: serde::Deserializer<'de>,
668    {
669        let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
670        IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
671    }
672}
673#[cfg(ipc)]
674impl Drop for IpcMemMap {
675    fn drop(&mut self) {
676        self.map.take();
677        self.read_handle.take();
678        if !self.is_custom {
679            std::fs::remove_file(&self.name).ok();
680        }
681    }
682}
683
684impl Serialize for IpcBytes {
685    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
686    where
687        S: serde::Serializer,
688    {
689        #[cfg(ipc)]
690        {
691            if is_ipc_serialization() {
692                match &*self.0 {
693                    IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
694                    IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
695                    IpcBytesData::MemMap(b) => {
696                        // need to keep alive until other process is also holding it, so we send
697                        // a sender for the other process to signal received.
698                        let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
699                            .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
700
701                        let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
702                        let hold = self.clone();
703                        crate::spawn_wait(move || {
704                            if let Err(e) = recv.recv_blocking() {
705                                tracing::error!("IpcBytes memmap completion signal not received, {e}")
706                            }
707                            drop(hold);
708                        });
709                        Ok(r)
710                    }
711                }
712            } else {
713                serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
714            }
715        }
716        #[cfg(not(ipc))]
717        {
718            serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
719        }
720    }
721}
722impl<'de> Deserialize<'de> for IpcBytes {
723    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
724    where
725        D: serde::Deserializer<'de>,
726    {
727        #[derive(Deserialize)]
728        enum VariantId {
729            Heap,
730            #[cfg(ipc)]
731            AnonMemMap,
732            #[cfg(ipc)]
733            MemMap,
734        }
735
736        struct EnumVisitor;
737        impl<'de> serde::de::Visitor<'de> for EnumVisitor {
738            type Value = IpcBytes;
739
740            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
741                write!(f, "IpcBytes variant")
742            }
743
744            fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
745            where
746                A: serde::de::EnumAccess<'de>,
747            {
748                let (variant, access) = data.variant::<VariantId>()?;
749                match variant {
750                    VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
751                    #[cfg(ipc)]
752                    VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
753                    #[cfg(ipc)]
754                    VariantId::MemMap => {
755                        let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
756                        completion_sender.send_blocking(()).map_err(|e| {
757                            serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
758                        })?;
759                        Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
760                    }
761                }
762            }
763        }
764        struct ByteSliceVisitor;
765        impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
766            type Value = IpcBytes;
767
768            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
769                write!(f, "byte buffer")
770            }
771
772            fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
773            where
774                E: serde::de::Error,
775            {
776                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
777            }
778
779            fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
780            where
781                E: serde::de::Error,
782            {
783                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
784            }
785
786            fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
787            where
788                E: serde::de::Error,
789            {
790                IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
791            }
792        }
793        impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
794            type Value = IpcBytes;
795
796            fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
797            where
798                D: serde::Deserializer<'de>,
799            {
800                deserializer.deserialize_bytes(ByteSliceVisitor)
801            }
802        }
803
804        #[cfg(ipc)]
805        {
806            deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
807        }
808        #[cfg(not(ipc))]
809        {
810            deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
811        }
812    }
813}
814
815/// Enables special serialization of memory mapped files for the `serialize` call.
816///
817/// IPC channels like [`IpcSender`] serialize messages inside this context to support [`IpcBytes`] fast memory map sharing across processes.
818///
819/// You can use the [`is_ipc_serialization`] to check if inside context.
820///
821/// [`IpcSender`]: super::IpcSender
822#[cfg(ipc)]
823pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
824    let parent = IPC_SERIALIZATION.replace(true);
825    let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
826    serialize()
827}
828
829/// Checks if is inside [`with_ipc_serialization`].
830#[cfg(ipc)]
831pub fn is_ipc_serialization() -> bool {
832    IPC_SERIALIZATION.get()
833}
834
835#[cfg(ipc)]
836thread_local! {
837    static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
838}
839
840impl IpcBytes {
841    /// Create a weak in process reference.
842    ///
843    /// Note that the weak reference cannot upgrade if only another process holds a strong reference.
844    pub fn downgrade(&self) -> WeakIpcBytes {
845        WeakIpcBytes(Arc::downgrade(&self.0))
846    }
847}
848
849/// Weak reference to an in process [`IpcBytes`].
850pub struct WeakIpcBytes(Weak<IpcBytesData>);
851impl WeakIpcBytes {
852    /// Get strong reference if any exists in the process.
853    pub fn upgrade(&self) -> Option<IpcBytes> {
854        self.0.upgrade().map(IpcBytes)
855    }
856
857    /// Count of strong references in the process.
858    pub fn strong_count(&self) -> usize {
859        self.0.strong_count()
860    }
861}
862
863/// Represents an async [`IpcBytes`] writer.
864///
865/// Use [`IpcBytes::new_writer`] to start writing.
866pub struct IpcBytesWriter {
867    inner: blocking::Unblock<IpcBytesWriterBlocking>,
868}
869impl IpcBytesWriter {
870    /// Finish writing and move data to a shareable [`IpcBytes`].
871    pub async fn finish(self) -> std::io::Result<IpcBytes> {
872        let inner = self.inner.into_inner().await;
873        blocking::unblock(move || inner.finish()).await
874    }
875
876    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
877    pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
878        let inner = self.inner.into_inner().await;
879        blocking::unblock(move || inner.finish_mut()).await
880    }
881}
882impl crate::io::AsyncWrite for IpcBytesWriter {
883    fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
884        crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
885    }
886
887    fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
888        crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
889    }
890
891    fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
892        crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
893    }
894}
895impl crate::io::AsyncSeek for IpcBytesWriter {
896    fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
897        crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
898    }
899}
900
901/// Represents a blocking [`IpcBytes`] writer.
902///
903/// Use [`IpcBytes::new_writer_blocking`] to start writing.
904pub struct IpcBytesWriterBlocking {
905    #[cfg(ipc)]
906    heap_buf: Vec<u8>,
907    #[cfg(ipc)]
908    memmap: Option<(PathBuf, std::fs::File)>,
909
910    #[cfg(not(ipc))]
911    heap_buf: std::io::Cursor<Vec<u8>>,
912}
913impl IpcBytesWriterBlocking {
914    /// Finish writing and move data to a shareable [`IpcBytes`].
915    pub fn finish(self) -> std::io::Result<IpcBytes> {
916        let m = self.finish_mut()?;
917        m.finish_blocking()
918    }
919
920    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
921    pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
922        self.flush()?;
923        #[cfg(ipc)]
924        {
925            let (len, inner) = match self.memmap {
926                Some((name, write_handle)) => {
927                    // SAFETY: we hold write lock
928                    let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
929                    let len = map.len();
930                    (len, IpcBytesMutInner::MemMap { name, map, write_handle })
931                }
932                None => {
933                    let len = self.heap_buf.len();
934                    let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
935                        IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
936                    } else {
937                        IpcBytesMutInner::Heap(self.heap_buf)
938                    };
939                    (len, i)
940                }
941            };
942            Ok(IpcBytesMut { len, inner })
943        }
944        #[cfg(not(ipc))]
945        {
946            let heap_buf = self.heap_buf.into_inner();
947            let len = heap_buf.len();
948            let inner = IpcBytesMutInner::Heap(heap_buf);
949            Ok(IpcBytesMut { len, inner })
950        }
951    }
952
953    #[cfg(ipc)]
954    fn alloc_memmap_file(&mut self) -> io::Result<()> {
955        if self.memmap.is_none() {
956            let (name, file) = IpcBytes::create_memmap()?;
957            file.lock()?;
958            #[cfg(unix)]
959            {
960                let mut permissions = file.metadata()?.permissions();
961                use std::os::unix::fs::PermissionsExt;
962                permissions.set_mode(0o600);
963                file.set_permissions(permissions)?;
964            }
965            self.memmap = Some((name, file));
966        }
967        let file = &mut self.memmap.as_mut().unwrap().1;
968
969        file.write_all(&self.heap_buf)?;
970        // already allocated UNNAMED_MAX, continue using it as a large buffer
971        self.heap_buf.clear();
972        Ok(())
973    }
974}
975impl std::io::Write for IpcBytesWriterBlocking {
976    fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
977        #[cfg(ipc)]
978        {
979            if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
980                // write exceed heap buffer, convert to memmap or flush to existing memmap
981                self.alloc_memmap_file()?;
982
983                if write_buf.len() > IpcBytes::UNNAMED_MAX {
984                    // writing massive payload, skip buffer
985                    self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
986                } else {
987                    self.heap_buf.extend_from_slice(write_buf);
988                }
989            } else {
990                if self.memmap.is_none() {
991                    // heap buffer not fully allocated yet, ensure we only allocate up to UNNAMED_MAX
992                    self.heap_buf
993                        .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
994                }
995                self.heap_buf.extend_from_slice(write_buf);
996            }
997
998            Ok(write_buf.len())
999        }
1000
1001        #[cfg(not(ipc))]
1002        {
1003            std::io::Write::write(&mut self.heap_buf, write_buf)
1004        }
1005    }
1006
1007    fn flush(&mut self) -> io::Result<()> {
1008        #[cfg(ipc)]
1009        if let Some((_, file)) = &mut self.memmap {
1010            if !self.heap_buf.is_empty() {
1011                file.write_all(&self.heap_buf)?;
1012                self.heap_buf.clear();
1013            }
1014            file.flush()?;
1015        }
1016        Ok(())
1017    }
1018}
1019impl std::io::Seek for IpcBytesWriterBlocking {
1020    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
1021        #[cfg(ipc)]
1022        {
1023            self.alloc_memmap_file()?;
1024            let (_, file) = self.memmap.as_mut().unwrap();
1025            if !self.heap_buf.is_empty() {
1026                file.write_all(&self.heap_buf)?;
1027                self.heap_buf.clear();
1028            }
1029            file.seek(pos)
1030        }
1031        #[cfg(not(ipc))]
1032        {
1033            std::io::Seek::seek(&mut self.heap_buf, pos)
1034        }
1035    }
1036}
1037
1038enum IpcBytesMutInner {
1039    Heap(Vec<u8>),
1040    #[cfg(ipc)]
1041    AnonMemMap(IpcSharedMemory),
1042    #[cfg(ipc)]
1043    MemMap {
1044        name: PathBuf,
1045        map: memmap2::MmapMut,
1046        write_handle: std::fs::File,
1047    },
1048}
1049
1050/// Represents preallocated exclusive mutable memory for a new [`IpcBytes`].
1051///
1052/// Use [`IpcBytes::new_mut`] or [`IpcBytes::new_mut_blocking`] to allocate.
1053pub struct IpcBytesMut {
1054    inner: IpcBytesMutInner,
1055    len: usize,
1056}
1057impl ops::Deref for IpcBytesMut {
1058    type Target = [u8];
1059
1060    fn deref(&self) -> &Self::Target {
1061        let len = self.len;
1062        match &self.inner {
1063            IpcBytesMutInner::Heap(v) => &v[..len],
1064            #[cfg(ipc)]
1065            IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1066            #[cfg(ipc)]
1067            IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1068        }
1069    }
1070}
1071impl ops::DerefMut for IpcBytesMut {
1072    fn deref_mut(&mut self) -> &mut Self::Target {
1073        let len = self.len;
1074        match &mut self.inner {
1075            IpcBytesMutInner::Heap(v) => &mut v[..len],
1076            #[cfg(ipc)]
1077            IpcBytesMutInner::AnonMemMap(m) => {
1078                // SAFETY: we are the only reference to the map
1079                unsafe { m.deref_mut() }
1080            }
1081            #[cfg(ipc)]
1082            IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1083        }
1084    }
1085}
1086impl fmt::Debug for IpcBytesMut {
1087    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088        write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1089    }
1090}
1091impl IpcBytesMut {
1092    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1093    pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
1094        #[cfg(ipc)]
1095        if len <= IpcBytes::INLINE_MAX {
1096            Ok(IpcBytesMut {
1097                len,
1098                inner: IpcBytesMutInner::Heap(vec![0; len]),
1099            })
1100        } else if len <= IpcBytes::UNNAMED_MAX {
1101            Ok(IpcBytesMut {
1102                len,
1103                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1104            })
1105        } else {
1106            blocking::unblock(move || Self::new_blocking(len)).await
1107        }
1108
1109        #[cfg(not(ipc))]
1110        {
1111            Ok(IpcBytesMut {
1112                len,
1113                inner: IpcBytesMutInner::Heap(vec![0; len]),
1114            })
1115        }
1116    }
1117
1118    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1119    pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
1120        #[cfg(ipc)]
1121        if len <= IpcBytes::INLINE_MAX {
1122            Ok(IpcBytesMut {
1123                len,
1124                inner: IpcBytesMutInner::Heap(vec![0; len]),
1125            })
1126        } else if len <= IpcBytes::UNNAMED_MAX {
1127            Ok(IpcBytesMut {
1128                len,
1129                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1130            })
1131        } else {
1132            let (name, file) = IpcBytes::create_memmap()?;
1133            file.lock()?;
1134            #[cfg(unix)]
1135            {
1136                let mut permissions = file.metadata()?.permissions();
1137                use std::os::unix::fs::PermissionsExt;
1138                permissions.set_mode(0o600);
1139                file.set_permissions(permissions)?;
1140            }
1141            file.set_len(len as u64)?;
1142            // SAFETY: we hold write lock
1143            let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
1144            Ok(IpcBytesMut {
1145                len,
1146                inner: IpcBytesMutInner::MemMap {
1147                    name,
1148                    map,
1149                    write_handle: file,
1150                },
1151            })
1152        }
1153        #[cfg(not(ipc))]
1154        {
1155            Ok(IpcBytesMut {
1156                len,
1157                inner: IpcBytesMutInner::Heap(vec![0; len]),
1158            })
1159        }
1160    }
1161
1162    /// Uses `buf` or copies it to exclusive mutable memory.
1163    pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1164        #[cfg(ipc)]
1165        if buf.len() <= IpcBytes::INLINE_MAX {
1166            Ok(Self {
1167                len: buf.len(),
1168                inner: IpcBytesMutInner::Heap(buf),
1169            })
1170        } else {
1171            blocking::unblock(move || {
1172                let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1173                b[..].copy_from_slice(&buf);
1174                Ok(b)
1175            })
1176            .await
1177        }
1178        #[cfg(not(ipc))]
1179        {
1180            Ok(Self {
1181                len: buf.len(),
1182                inner: IpcBytesMutInner::Heap(buf),
1183            })
1184        }
1185    }
1186
1187    /// Convert to immutable shareable [`IpcBytes`].
1188    pub async fn finish(mut self) -> io::Result<IpcBytes> {
1189        let len = self.len;
1190        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1191            IpcBytesMutInner::Heap(mut v) => {
1192                v.truncate(len);
1193                v.shrink_to_fit();
1194                IpcBytesData::Heap(v)
1195            }
1196            #[cfg(ipc)]
1197            IpcBytesMutInner::AnonMemMap(m) => {
1198                if len < IpcBytes::INLINE_MAX {
1199                    IpcBytesData::Heap(m[..len].to_vec())
1200                } else if len < m.len() {
1201                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1202                } else {
1203                    IpcBytesData::AnonMemMap(m)
1204                }
1205            }
1206            #[cfg(ipc)]
1207            IpcBytesMutInner::MemMap { name, map, write_handle } => {
1208                let len = self.len;
1209                blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1210            }
1211        };
1212        Ok(IpcBytes(Arc::new(data)))
1213    }
1214
1215    #[cfg(ipc)]
1216    fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1217        let alloc_len = map.len();
1218        if alloc_len != len {
1219            write_handle.set_len(len as u64)?;
1220        }
1221        write_handle.unlock()?;
1222        let map = if alloc_len != len {
1223            drop(map);
1224            // SAFETY: we have write access to the file still
1225            unsafe { memmap2::Mmap::map(&write_handle) }?
1226        } else {
1227            map.make_read_only()?
1228        };
1229        let mut permissions = write_handle.metadata()?.permissions();
1230        permissions.set_readonly(true);
1231        #[cfg(unix)]
1232        {
1233            use std::os::unix::fs::PermissionsExt;
1234            permissions.set_mode(0o400);
1235        }
1236        write_handle.set_permissions(permissions)?;
1237        drop(write_handle);
1238        let read_handle = std::fs::File::open(&name)?;
1239        read_handle.lock_shared()?;
1240        Ok(IpcBytesData::MemMap(IpcMemMap {
1241            name,
1242            range: 0..len,
1243            is_custom: false,
1244            map: Some(map),
1245            read_handle: Some(read_handle),
1246        }))
1247    }
1248}
1249impl IpcBytesMut {
1250    /// Uses `buf` or copies it to exclusive mutable memory.
1251    pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1252        #[cfg(ipc)]
1253        if buf.len() <= IpcBytes::INLINE_MAX {
1254            Ok(Self {
1255                len: buf.len(),
1256                inner: IpcBytesMutInner::Heap(buf),
1257            })
1258        } else {
1259            let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1260            b[..].copy_from_slice(&buf);
1261            Ok(b)
1262        }
1263        #[cfg(not(ipc))]
1264        {
1265            Ok(Self {
1266                len: buf.len(),
1267                inner: IpcBytesMutInner::Heap(buf),
1268            })
1269        }
1270    }
1271
1272    /// Convert to immutable shareable [`IpcBytes`].
1273    pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1274        let len = self.len;
1275        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1276            IpcBytesMutInner::Heap(mut v) => {
1277                v.truncate(len);
1278                IpcBytesData::Heap(v)
1279            }
1280            #[cfg(ipc)]
1281            IpcBytesMutInner::AnonMemMap(m) => {
1282                if len < IpcBytes::INLINE_MAX {
1283                    IpcBytesData::Heap(m[..len].to_vec())
1284                } else if len < m.len() {
1285                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1286                } else {
1287                    IpcBytesData::AnonMemMap(m)
1288                }
1289            }
1290            #[cfg(ipc)]
1291            IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1292        };
1293        Ok(IpcBytes(Arc::new(data)))
1294    }
1295}
1296#[cfg(ipc)]
1297impl Drop for IpcBytesMut {
1298    fn drop(&mut self) {
1299        if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1300            drop(map);
1301            drop(write_handle);
1302            std::fs::remove_file(name).ok();
1303        }
1304    }
1305}
1306
1307/// Safe bytemuck casting wrapper for [`IpcBytesMut`].
1308///
1309/// Use [`IpcBytesMut::cast`] to cast.
1310pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1311    bytes: IpcBytesMut,
1312    _t: PhantomData<T>,
1313}
1314impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1315    type Target = [T];
1316
1317    fn deref(&self) -> &Self::Target {
1318        bytemuck::cast_slice::<u8, T>(&self.bytes)
1319    }
1320}
1321impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1322    fn deref_mut(&mut self) -> &mut Self::Target {
1323        bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1324    }
1325}
1326impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1327    /// Convert back to [`IpcBytesMut`].
1328    pub fn into_inner(self) -> IpcBytesMut {
1329        self.bytes
1330    }
1331}
1332impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1333    fn from(value: IpcBytesMutCast<T>) -> Self {
1334        value.bytes
1335    }
1336}
1337impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
1338    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1339    pub async fn new(len: usize) -> io::Result<Self> {
1340        IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1341    }
1342
1343    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1344    pub fn new_blocking(len: usize) -> io::Result<Self> {
1345        IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1346    }
1347
1348    /// Uses `buf` or copies it to exclusive mutable memory.
1349    pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1350        IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
1351    }
1352
1353    /// Uses `buf` or copies it to exclusive mutable memory.
1354    pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1355        IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
1356    }
1357
1358    /// Reference the underlying raw bytes.
1359    pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
1360        &mut self.bytes
1361    }
1362
1363    /// Convert to immutable shareable [`IpcBytesCast`].
1364    pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
1365        self.bytes.finish().await.map(IpcBytes::cast)
1366    }
1367
1368    /// Convert to immutable shareable [`IpcBytesCast`].
1369    pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
1370        self.bytes.finish_blocking().map(IpcBytes::cast)
1371    }
1372}
1373
1374impl IpcBytesMut {
1375    /// Safe bytemuck casting wrapper.
1376    ///
1377    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytesMust`.
1378    ///
1379    /// # Panics
1380    ///
1381    /// Panics if cannot cast, se [bytemuck docs] for details.
1382    ///
1383    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1384    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1385        let r = IpcBytesMutCast {
1386            bytes: self,
1387            _t: PhantomData,
1388        };
1389        let _assert = &r[..];
1390        r
1391    }
1392
1393    /// Safe bytemuck cast to slice.
1394    ///
1395    /// # Panics
1396    ///
1397    /// Panics if cannot cast, se [bytemuck docs] for details.
1398    ///
1399    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1400    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1401        bytemuck::cast_slice(self)
1402    }
1403
1404    /// Safe bytemuck cast to mutable slice.
1405    ///
1406    /// # Panics
1407    ///
1408    /// Panics if cannot cast, se [bytemuck docs] for details.
1409    ///
1410    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1411    pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1412        bytemuck::cast_slice_mut(self)
1413    }
1414}
1415
1416/// Safe bytemuck casting wrapper for [`IpcBytes`].
1417///
1418/// Use [`IpcBytes::cast`] to cast.
1419pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1420    bytes: IpcBytes,
1421    _t: PhantomData<T>,
1422}
1423impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
1424    fn default() -> Self {
1425        Self {
1426            bytes: Default::default(),
1427            _t: PhantomData,
1428        }
1429    }
1430}
1431impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1432    type Target = [T];
1433
1434    fn deref(&self) -> &Self::Target {
1435        bytemuck::cast_slice::<u8, T>(&self.bytes)
1436    }
1437}
1438impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1439    /// Convert back to [`IpcBytes`].
1440    pub fn into_inner(self) -> IpcBytes {
1441        self.bytes
1442    }
1443}
1444impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1445    fn from(value: IpcBytesCast<T>) -> Self {
1446        value.bytes
1447    }
1448}
1449impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
1450    fn clone(&self) -> Self {
1451        Self {
1452            bytes: self.bytes.clone(),
1453            _t: PhantomData,
1454        }
1455    }
1456}
1457impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
1458    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1459        write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
1460    }
1461}
1462impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
1463    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1464    where
1465        S: serde::Serializer,
1466    {
1467        self.bytes.serialize(serializer)
1468    }
1469}
1470impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
1471    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1472    where
1473        D: serde::Deserializer<'de>,
1474    {
1475        let bytes = IpcBytes::deserialize(deserializer)?;
1476        Ok(bytes.cast())
1477    }
1478}
1479impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
1480    fn eq(&self, other: &Self) -> bool {
1481        self.bytes == other.bytes
1482    }
1483}
1484impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
1485impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
1486    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytesCast` fast.
1487    pub async fn new_mut(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1488        IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1489    }
1490
1491    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1492    pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1493        IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1494    }
1495
1496    /// Copy or move data from vector.
1497    pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1498        IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
1499    }
1500
1501    /// Copy data from the iterator.
1502    ///
1503    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
1504    /// will collect to an [`IpcBytesWriter`] that can reallocate multiple times as the buffer grows.
1505    ///
1506    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
1507    /// is used, if it continues after the given maximum it is clipped.
1508    ///
1509    /// [`size_hint`]: Iterator::size_hint
1510    pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
1511        #[cfg(ipc)]
1512        {
1513            let (min, max) = iter.size_hint();
1514            let l = size_of::<T>();
1515            let min = min * l;
1516            let max = max.map(|m| m * l);
1517            if let Some(max) = max {
1518                if max <= IpcBytes::INLINE_MAX {
1519                    return Self::from_vec(iter.collect()).await;
1520                } else if max == min {
1521                    let mut r = IpcBytes::new_mut(max).await?;
1522                    let mut actual_len = 0;
1523                    for (i, f) in r.chunks_exact_mut(l).zip(iter) {
1524                        i.copy_from_slice(bytemuck::bytes_of(&f));
1525                        actual_len += 1;
1526                    }
1527                    r.truncate(actual_len * l);
1528                    return r.finish().await.map(IpcBytes::cast);
1529                }
1530            }
1531
1532            let mut writer = IpcBytes::new_writer().await;
1533            for f in iter {
1534                writer.write_all(bytemuck::bytes_of(&f)).await?;
1535            }
1536            writer.finish().await.map(IpcBytes::cast)
1537        }
1538        #[cfg(not(ipc))]
1539        {
1540            Self::from_vec(iter.collect()).await
1541        }
1542    }
1543
1544    /// Copy or move data from vector.
1545    pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1546        IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
1547    }
1548
1549    /// Copy data from slice.
1550    pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1551        IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
1552    }
1553
1554    /// Copy data from the iterator.
1555    ///
1556    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
1557    /// will collect to an [`IpcBytesWriterBlocking`] that can reallocate multiple times as the buffer grows.
1558    ///
1559    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
1560    /// is used, if it continues after the given maximum it is clipped.
1561    ///
1562    /// [`size_hint`]: Iterator::size_hint
1563    pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
1564        #[cfg(ipc)]
1565        {
1566            let (min, max) = iter.size_hint();
1567            let l = size_of::<T>();
1568            let min = min * l;
1569            let max = max.map(|m| m * l);
1570            if let Some(max) = max {
1571                if max <= IpcBytes::INLINE_MAX {
1572                    return Self::from_vec_blocking(iter.collect());
1573                } else if max == min {
1574                    let mut r = IpcBytes::new_mut_blocking(max)?;
1575                    let mut actual_len = 0;
1576                    for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
1577                        i.copy_from_slice(bytemuck::bytes_of(&f));
1578                        actual_len += 1;
1579                    }
1580                    r.truncate(actual_len * l);
1581                    return r.finish_blocking().map(IpcBytes::cast);
1582                }
1583            }
1584
1585            let mut writer = IpcBytes::new_writer_blocking();
1586            for f in iter {
1587                writer.write_all(bytemuck::bytes_of(&f))?;
1588            }
1589            writer.finish().map(IpcBytes::cast)
1590        }
1591        #[cfg(not(ipc))]
1592        {
1593            Self::from_vec_blocking(iter.collect())
1594        }
1595    }
1596
1597    /// Reference the underlying raw bytes.
1598    pub fn as_bytes(&self) -> &IpcBytes {
1599        &self.bytes
1600    }
1601}
1602
1603impl IpcBytes {
1604    /// Safe bytemuck casting wrapper.
1605    ///
1606    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytes`.
1607    ///
1608    /// # Panics
1609    ///
1610    /// Panics if cannot cast, se [bytemuck docs] for details.
1611    ///
1612    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1613    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1614        let r = IpcBytesCast {
1615            bytes: self,
1616            _t: PhantomData,
1617        };
1618        let _assert = &r[..];
1619        r
1620    }
1621
1622    /// Safe bytemuck cast to slice.
1623    ///
1624    /// # Panics
1625    ///
1626    /// Panics if cannot cast, se [bytemuck docs] for details.
1627    ///
1628    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1629    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1630        bytemuck::cast_slice(self)
1631    }
1632}
1633
1634impl IpcBytesMut {
1635    /// Shorten the bytes length.
1636    ///
1637    /// If `new_len` is greater or equal to current length does nothing.
1638    ///
1639    /// Note that this does not affect memory allocation, the extra bytes are only dropped on finish.
1640    pub fn truncate(&mut self, new_len: usize) {
1641        self.len = self.len.min(new_len);
1642    }
1643
1644    /// Convert chunks of length `L0` to chunks of length `L1` where `L1 <= L0`.
1645    ///
1646    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1647    ///
1648    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1649    ///
1650    /// # Panics
1651    ///
1652    /// Panics if `L1 > L0` or if bytes length is not multiple of `L0`.
1653    pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1654        assert!(L1 <= L0);
1655
1656        let self_ = &mut self[..];
1657
1658        let len = self_.len();
1659        if len == 0 {
1660            return;
1661        }
1662        assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1663
1664        let ptr = self_.as_mut_ptr();
1665        let mut write = 0usize;
1666        let mut read = 0usize;
1667
1668        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1669        unsafe {
1670            while read < len {
1671                let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1672                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1673                read += L0;
1674
1675                let out_chunk = reduce(in_chunk.assume_init());
1676
1677                std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1678                write += L1;
1679            }
1680        }
1681
1682        self.truncate(write);
1683    }
1684
1685    /// Convert chunks of `in_chunk_len` to chunks of `out_chunk_buf.len()` where `out_chunk_buf.len() <= in_chunk_len`.
1686    ///
1687    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1688    ///
1689    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1690    ///
1691    /// # Panics
1692    ///
1693    /// Panics if `out_chunk_buf.len() > in_chunk_len` or if bytes length is not multiple of `in_chunk_len`.
1694    pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1695        assert!(out_chunk_buf.len() < in_chunk_len);
1696
1697        let self_ = &mut self[..];
1698
1699        let len = self_.len();
1700        if len == 0 {
1701            return;
1702        }
1703        assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1704
1705        let ptr = self_.as_mut_ptr();
1706        let mut write = 0usize;
1707        let mut read = 0usize;
1708
1709        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1710        unsafe {
1711            while read < len {
1712                reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1713                read += in_chunk_len;
1714
1715                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1716                write += out_chunk_buf.len();
1717            }
1718        }
1719
1720        self.truncate(write);
1721    }
1722
1723    /// Convert chunks of length `L0` to chunks of length `L1` where `size_of::<T1>() * L1 <= size_of::<T0>() * L0`.
1724    ///
1725    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1726    ///
1727    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1728    ///
1729    /// # Panics
1730    ///
1731    /// Panics if `size_of::<T1>() * L1 > size_of::<T0>() * L0` or if bytes length is not multiple of `size_of::<T0>() * L0`.
1732    pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1733    where
1734        T0: bytemuck::AnyBitPattern,
1735    {
1736        let l0 = std::mem::size_of::<T0>() * L0;
1737        let l1 = std::mem::size_of::<T1>() * L1;
1738        assert!(l1 <= l0);
1739
1740        let self_ = &mut self[..];
1741
1742        let len = self_.len();
1743        if len == 0 {
1744            return;
1745        }
1746        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1747
1748        let ptr = self_.as_mut_ptr();
1749        let mut write = 0usize;
1750        let mut read = 0usize;
1751
1752        // SAFETY:
1753        // Pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1754        // Reading [T0; L0] from raw bytes is safe because T0: AnyBitPattern
1755        unsafe {
1756            while read < len {
1757                let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1758                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1759                read += l0;
1760
1761                let out_chunk = reduce(in_chunk.assume_init());
1762
1763                std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1764                write += l1;
1765            }
1766        }
1767
1768        self.truncate(write);
1769    }
1770
1771    /// Convert chunks of `size_of::<T0>() * in_chunk_len` to chunks of `size_of::<T1>() * out_chunk_buf.len()`
1772    /// where `size_of::<T1>() * out_chunk_buf.len() <= size_of::<T0>() * in_chunk_len`.
1773    ///
1774    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1775    ///
1776    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1777    ///
1778    /// # Panics
1779    ///
1780    /// Panics if `size_of::<T1>() * out_chunk_buf.len() > size_of::<T0>() * in_chunk_len` or if bytes
1781    /// length is not multiple of `size_of::<T0>() * in_chunk_len`.
1782    pub fn cast_reduce_in_place_dyn<T0, T1>(
1783        &mut self,
1784        in_chunk_len: usize,
1785        out_chunk_buf: &mut [T1],
1786        mut reduce: impl FnMut(&[T0], &mut [T1]),
1787    ) where
1788        T0: bytemuck::AnyBitPattern,
1789    {
1790        let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1791        let l1 = std::mem::size_of_val(out_chunk_buf);
1792
1793        assert!(l1 <= l0);
1794
1795        let self_ = &mut self[..];
1796
1797        let len = self_.len();
1798        if len == 0 {
1799            return;
1800        }
1801        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1802
1803        let ptr = self_.as_mut_ptr();
1804        let mut write = 0usize;
1805        let mut read = 0usize;
1806
1807        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1808        unsafe {
1809            while read < len {
1810                reduce(
1811                    bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1812                    &mut *out_chunk_buf,
1813                );
1814                read += l0;
1815
1816                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1817                write += l1;
1818            }
1819        }
1820
1821        self.truncate(write);
1822    }
1823
1824    /// Reverses the order of chunks in the slice, in place.
1825    ///
1826    /// Chunk length is const L.
1827    ///
1828    /// # Panics
1829    ///
1830    /// Panics if length is not multiple of `L`.
1831    pub fn reverse_chunks<const L: usize>(&mut self) {
1832        let self_ = &mut self[..];
1833
1834        let len = self_.len();
1835
1836        if len == 0 || L == 0 {
1837            return;
1838        }
1839
1840        if L == 1 {
1841            return self_.reverse();
1842        }
1843
1844        assert!(len.is_multiple_of(L), "length must be multiple of L");
1845
1846        // SAFETY: already verified is multiple and already handled L=0
1847        unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
1848    }
1849
1850    /// Reverses the order of chunks in the slice, in place.
1851    ///
1852    /// # Panics
1853    ///
1854    /// Panics if length is not multiple of `chunk_len`.
1855    pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
1856        let self_ = &mut self[..];
1857
1858        let len = self_.len();
1859
1860        if len == 0 || chunk_len == 0 {
1861            return;
1862        }
1863
1864        if chunk_len == 1 {
1865            return self_.reverse();
1866        }
1867
1868        assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
1869
1870        let mut a = 0;
1871        let mut b = len - chunk_len;
1872
1873        let ptr = self_.as_mut_ptr();
1874
1875        // SAFETY: chunks are not overlapping and loop stops before at mid, chunk_len > 1
1876        unsafe {
1877            while a < b {
1878                std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
1879                a += chunk_len;
1880                b -= chunk_len;
1881            }
1882        }
1883    }
1884}
1885
1886// Slice iterator is very efficient, but it hold a reference, so we hold a self reference.
1887// The alternative to this is copying the unsafe code from std and adapting it or implementing
1888// a much slower index based iterator.
1889type SliceIter<'a> = std::slice::Iter<'a, u8>;
1890self_cell::self_cell! {
1891    struct IpcBytesIntoIterInner {
1892        owner: IpcBytes,
1893        #[covariant]
1894        dependent: SliceIter,
1895    }
1896}
1897
1898/// An [`IpcBytes`] iterator that holds a strong reference to it.
1899pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
1900impl IpcBytesIntoIter {
1901    fn new(bytes: IpcBytes) -> Self {
1902        Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
1903    }
1904
1905    /// The source bytes.
1906    pub fn source(&self) -> &IpcBytes {
1907        self.0.borrow_owner()
1908    }
1909
1910    /// Bytes not yet iterated.
1911    pub fn rest(&self) -> &[u8] {
1912        self.0.borrow_dependent().as_slice()
1913    }
1914}
1915impl Iterator for IpcBytesIntoIter {
1916    type Item = u8;
1917
1918    fn next(&mut self) -> Option<u8> {
1919        self.0.with_dependent_mut(|_, d| d.next().copied())
1920    }
1921
1922    fn size_hint(&self) -> (usize, Option<usize>) {
1923        self.0.borrow_dependent().size_hint()
1924    }
1925
1926    fn count(self) -> usize
1927    where
1928        Self: Sized,
1929    {
1930        self.0.borrow_dependent().as_slice().len()
1931    }
1932
1933    fn nth(&mut self, n: usize) -> Option<u8> {
1934        self.0.with_dependent_mut(|_, d| d.nth(n).copied())
1935    }
1936
1937    fn last(mut self) -> Option<Self::Item>
1938    where
1939        Self: Sized,
1940    {
1941        self.next_back()
1942    }
1943}
1944impl DoubleEndedIterator for IpcBytesIntoIter {
1945    fn next_back(&mut self) -> Option<Self::Item> {
1946        self.0.with_dependent_mut(|_, d| d.next_back().copied())
1947    }
1948
1949    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
1950        self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
1951    }
1952}
1953impl FusedIterator for IpcBytesIntoIter {}
1954impl Default for IpcBytesIntoIter {
1955    fn default() -> Self {
1956        IpcBytes::empty().into_iter()
1957    }
1958}
1959impl IntoIterator for IpcBytes {
1960    type Item = u8;
1961
1962    type IntoIter = IpcBytesIntoIter;
1963
1964    fn into_iter(self) -> Self::IntoIter {
1965        IpcBytesIntoIter::new(self)
1966    }
1967}
1968
1969/// An [`IpcBytesCast`] iterator that holds a strong reference to it.
1970pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
1971impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
1972    fn new(bytes: IpcBytesCast<T>) -> Self {
1973        Self(bytes.bytes.clone().into_iter(), bytes)
1974    }
1975
1976    /// The source bytes.
1977    pub fn source(&self) -> &IpcBytesCast<T> {
1978        &self.1
1979    }
1980
1981    /// Items not yet iterated.
1982    pub fn rest(&self) -> &[T] {
1983        bytemuck::cast_slice(self.0.rest())
1984    }
1985}
1986impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
1987    type Item = T;
1988
1989    fn next(&mut self) -> Option<T> {
1990        let size = size_of::<T>();
1991        let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
1992        self.0.nth(size - 1);
1993        Some(r)
1994    }
1995
1996    fn size_hint(&self) -> (usize, Option<usize>) {
1997        let (mut min, mut max) = self.0.size_hint();
1998        min /= size_of::<T>();
1999        if let Some(max) = &mut max {
2000            *max /= size_of::<T>();
2001        }
2002        (min, max)
2003    }
2004
2005    fn nth(&mut self, n: usize) -> Option<T> {
2006        let size = size_of::<T>();
2007
2008        let byte_skip = n.checked_mul(size)?;
2009        let byte_end = byte_skip.checked_add(size)?;
2010
2011        let bytes = self.0.rest().get(byte_skip..byte_end)?;
2012        let r = *bytemuck::from_bytes(bytes);
2013
2014        self.0.nth(byte_end - 1);
2015
2016        Some(r)
2017    }
2018
2019    fn last(mut self) -> Option<Self::Item>
2020    where
2021        Self: Sized,
2022    {
2023        self.next_back()
2024    }
2025}
2026impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
2027    fn next_back(&mut self) -> Option<T> {
2028        let size = size_of::<T>();
2029
2030        let len = self.0.rest().len();
2031        if len < size {
2032            return None;
2033        }
2034
2035        let start = len - size;
2036        let bytes = &self.0.rest()[start..];
2037        let r = *bytemuck::from_bytes(bytes);
2038
2039        self.0.nth_back(size - 1);
2040
2041        Some(r)
2042    }
2043
2044    fn nth_back(&mut self, n: usize) -> Option<T> {
2045        let size = size_of::<T>();
2046
2047        let rev_byte_skip = n.checked_mul(size)?;
2048        let rev_byte_end = rev_byte_skip.checked_add(size)?;
2049        let len = self.0.rest().len();
2050
2051        if len < rev_byte_end {
2052            return None;
2053        }
2054
2055        let start = len - rev_byte_end;
2056        let end = len - rev_byte_skip;
2057
2058        let bytes = &self.0.rest()[start..end];
2059        let r = *bytemuck::from_bytes(bytes);
2060
2061        self.0.nth_back(rev_byte_end - 1);
2062
2063        Some(r)
2064    }
2065}
2066impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
2067impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
2068    fn default() -> Self {
2069        IpcBytes::empty().cast::<T>().into_iter()
2070    }
2071}
2072impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
2073    type Item = T;
2074
2075    type IntoIter = IpcBytesCastIntoIter<T>;
2076
2077    fn into_iter(self) -> Self::IntoIter {
2078        IpcBytesCastIntoIter::new(self)
2079    }
2080}