zng_task/channel/
ipc_bytes.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    cell::Cell,
5    fmt, fs,
6    io::{self, Read, Write},
7    iter::FusedIterator,
8    marker::PhantomData,
9    mem::MaybeUninit,
10    ops,
11    path::{Path, PathBuf},
12    pin::Pin,
13    sync::{Arc, Weak},
14};
15
16use futures_lite::{AsyncReadExt, AsyncWriteExt as _};
17#[cfg(ipc)]
18use ipc_channel::ipc::IpcSharedMemory;
19use parking_lot::Mutex;
20use serde::{Deserialize, Serialize, de::VariantAccess};
21use zng_app_context::RunOnDrop;
22
23/// Immutable bytes vector that can be can be shared fast over IPC.
24///
25/// # Memory Storage
26///
27/// All storage backends are held by a [`Arc`] pointer, so cloning in process is always very cheap.
28///
29/// The `from_*` constructor functions use different storage depending on byte length. Bytes <= 64KB are allocated in the heap
30/// and are copied when shared with another process. Bytes <= 128MB are allocated in an anonymous memory map, only the system handle
31/// is copied when shared with another process. Bytes > 128MB are allocated in a temporary file with restricted access and memory mapped
32/// for read, only the file path and some metadata are copied when shared with another process.
33///
34/// Constructor functions for creating memory maps directly are also provided.
35///
36/// Note that in builds without the `"ipc"` crate feature only heap backend is available, in that case all data lengths are stored in the heap.
37///
38/// # Serialization
39///
40/// When serialized inside [`with_ipc_serialization`] the memory map bytes are not copied, only the system handle and metadata is serialized.
41/// When serialized in other contexts all bytes are copied.
42///
43/// When deserializing memory map handles are reconnected and if deserializing bytes selects the best storage based on data length.
44///
45/// [`IpcSender`]: super::IpcSender
46#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(Arc<IpcBytesData>);
49enum IpcBytesData {
50    Heap(Vec<u8>),
51    #[cfg(ipc)]
52    AnonMemMap(IpcSharedMemory),
53    #[cfg(ipc)]
54    MemMap(IpcMemMap),
55    #[cfg(ipc)]
56    MemMapAsyncDeserialize {
57        map: std::sync::OnceLock<IpcMemMap>,
58        // len copy to avoid OnceLock load blocking early in some common use cases (ImageEntry::is_loaded for example)
59        len: usize,
60    },
61}
62impl fmt::Debug for IpcBytes {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        write!(f, "IpcBytes(<{} bytes>)", self.len())
65    }
66}
67impl ops::Deref for IpcBytes {
68    type Target = [u8];
69
70    fn deref(&self) -> &Self::Target {
71        match &*self.0 {
72            IpcBytesData::Heap(i) => i,
73            #[cfg(ipc)]
74            IpcBytesData::AnonMemMap(m) => m,
75            #[cfg(ipc)]
76            IpcBytesData::MemMap(f) => f,
77            #[cfg(ipc)]
78            IpcBytesData::MemMapAsyncDeserialize { map, .. } => map.wait(),
79        }
80    }
81}
82
83impl IpcBytes {
84    /// New empty.
85    pub fn empty() -> Self {
86        IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
87    }
88}
89/// Async constructors.
90impl IpcBytes {
91    /// Start a memory efficient async writer for creating a `IpcBytes` with unknown length.
92    pub async fn new_writer() -> IpcBytesWriter {
93        IpcBytesWriter {
94            inner: blocking::Unblock::new(Self::new_writer_blocking()),
95        }
96    }
97
98    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
99    pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
100        IpcBytesMut::new(len).await
101    }
102
103    /// Copy or move data from vector.
104    pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
105        blocking::unblock(move || Self::from_vec_blocking(data)).await
106    }
107
108    /// Copy data from the iterator.
109    ///
110    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
111    /// will collect to an [`IpcBytesWriter`] that can reallocate multiple times as the buffer grows.
112    ///    
113    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
114    /// is used, if it continues after the given maximum it is clipped.
115    ///
116    /// [`size_hint`]: Iterator::size_hint
117    pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
118        #[cfg(ipc)]
119        {
120            let (min, max) = iter.size_hint();
121            if let Some(max) = max {
122                if max <= Self::INLINE_MAX {
123                    return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
124                } else if max == min {
125                    let mut r = IpcBytes::new_mut(max).await?;
126                    let mut actual_len = 0;
127                    for (i, b) in r.iter_mut().zip(iter) {
128                        *i = b;
129                        actual_len += 1;
130                    }
131                    r.truncate(actual_len);
132                    return r.finish().await;
133                }
134            }
135
136            let mut writer = Self::new_writer().await;
137            for b in iter {
138                writer.write_all(&[b]).await?;
139            }
140            writer.finish().await
141        }
142
143        #[cfg(not(ipc))]
144        {
145            Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
146        }
147    }
148
149    /// Read `data` into shared memory.
150    pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
151        #[cfg(ipc)]
152        {
153            Self::from_read_ipc(data).await
154        }
155        #[cfg(not(ipc))]
156        {
157            let mut data = data;
158            let mut buf = vec![];
159            data.read_to_end(&mut buf).await;
160            Self::from_vec(buf).await
161        }
162    }
163    #[cfg(ipc)]
164    async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
165        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
166        let mut len = 0;
167
168        // INLINE_MAX read
169        loop {
170            match data.read(&mut buf[len..]).await {
171                Ok(l) => {
172                    if l == 0 {
173                        // is <= INLINE_MAX
174                        buf.truncate(len);
175                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
176                    } else {
177                        len += l;
178                        if len == Self::INLINE_MAX + 1 {
179                            // goto UNNAMED_MAX read
180                            break;
181                        }
182                    }
183                }
184                Err(e) => match e.kind() {
185                    io::ErrorKind::WouldBlock => continue,
186                    _ => return Err(e),
187                },
188            }
189        }
190
191        // UNNAMED_MAX read
192        buf.resize(Self::UNNAMED_MAX + 1, 0);
193        loop {
194            match data.read(&mut buf[len..]).await {
195                Ok(l) => {
196                    if l == 0 {
197                        // is <= UNNAMED_MAX
198                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
199                    } else {
200                        len += l;
201                        if len == Self::UNNAMED_MAX + 1 {
202                            // goto named file loop
203                            break;
204                        }
205                    }
206                }
207                Err(e) => match e.kind() {
208                    io::ErrorKind::WouldBlock => continue,
209                    _ => return Err(e),
210                },
211            }
212        }
213
214        // named file copy
215        Self::new_memmap(async |m| {
216            use futures_lite::AsyncWriteExt as _;
217
218            m.write_all(&buf).await?;
219            crate::io::copy(data, m).await?;
220            Ok(())
221        })
222        .await
223    }
224
225    /// Read `path` into shared memory.
226    pub async fn from_path(path: PathBuf) -> io::Result<Self> {
227        let file = crate::fs::File::open(path).await?;
228        Self::from_file(file).await
229    }
230    /// Read `file` into shared memory.
231    pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
232        #[cfg(ipc)]
233        {
234            let len = file.metadata().await?.len();
235            if len <= Self::UNNAMED_MAX as u64 {
236                let mut buf = vec![0u8; len as usize];
237                file.read_exact(&mut buf).await?;
238                Self::from_vec_blocking(buf)
239            } else {
240                Self::new_memmap(async move |m| {
241                    crate::io::copy(&mut file, m).await?;
242                    Ok(())
243                })
244                .await
245            }
246        }
247        #[cfg(not(ipc))]
248        {
249            let mut buf = vec![];
250            file.read_to_end(&mut buf).await?;
251            Self::from_vec_blocking(buf)
252        }
253    }
254
255    /// Create a memory mapped file.
256    ///
257    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
258    /// always selects the slowest options, a file backed memory map.
259    #[cfg(ipc)]
260    pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
261        let (name, file) = blocking::unblock(Self::create_memmap).await?;
262        let mut file = crate::fs::File::from(file);
263        write(&mut file).await?;
264
265        let mut permissions = file.metadata().await?.permissions();
266        permissions.set_readonly(true);
267        #[cfg(unix)]
268        {
269            use std::os::unix::fs::PermissionsExt;
270            permissions.set_mode(0o400);
271        }
272        file.set_permissions(permissions).await?;
273
274        blocking::unblock(move || {
275            drop(file);
276            let map = IpcMemMap::read(name, None)?;
277            Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
278        })
279        .await
280    }
281
282    /// Memory map an existing file.
283    ///
284    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
285    ///
286    /// # Safety
287    ///
288    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
289    ///
290    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
291    /// so that the file data is as read-only as the static data in the current executable file.
292    ///
293    /// [`new_memmap`]: Self::new_memmap
294    #[cfg(ipc)]
295    pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
296        blocking::unblock(move || {
297            // SAFETY: up to the caller
298            unsafe { Self::open_memmap_blocking(file, range) }
299        })
300        .await
301    }
302
303    /// Gets if both point to the same memory.
304    pub fn ptr_eq(&self, other: &Self) -> bool {
305        let a = &self[..];
306        let b = &other[..];
307        (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
308    }
309
310    #[cfg(ipc)]
311    const INLINE_MAX: usize = 64 * 1024; // 64KB
312    #[cfg(ipc)]
313    const UNNAMED_MAX: usize = 128 * 1024 * 1024; // 128MB
314}
315
316/// Blocking constructors.
317impl IpcBytes {
318    /// Start a memory efficient blocking writer for creating a `IpcBytes` with unknown length.
319    pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
320        IpcBytesWriterBlocking {
321            #[cfg(ipc)]
322            heap_buf: vec![],
323            #[cfg(ipc)]
324            memmap: None,
325
326            #[cfg(not(ipc))]
327            heap_buf: std::io::Cursor::new(vec![]),
328        }
329    }
330
331    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
332    pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
333        IpcBytesMut::new_blocking(len)
334    }
335
336    /// Copy data from slice.
337    pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
338        #[cfg(ipc)]
339        {
340            if data.len() <= Self::INLINE_MAX {
341                Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
342            } else if data.len() <= Self::UNNAMED_MAX {
343                Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
344            } else {
345                Self::new_memmap_blocking(|m| m.write_all(data))
346            }
347        }
348        #[cfg(not(ipc))]
349        {
350            Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
351        }
352    }
353
354    /// Copy or move data from vector.
355    pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
356        #[cfg(ipc)]
357        {
358            if data.len() <= Self::INLINE_MAX {
359                Ok(Self(Arc::new(IpcBytesData::Heap(data))))
360            } else {
361                Self::from_slice_blocking(&data)
362            }
363        }
364        #[cfg(not(ipc))]
365        {
366            Ok(Self(Arc::new(IpcBytesData::Heap(data))))
367        }
368    }
369
370    /// Copy data from the iterator.
371    ///
372    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
373    /// will collect to an [`IpcBytesWriterBlocking`] that can reallocate multiple times as the buffer grows.
374    ///
375    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
376    /// is used, if it continues after the given maximum it is clipped.
377    ///
378    /// [`size_hint`]: Iterator::size_hint
379    pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
380        #[cfg(ipc)]
381        {
382            let (min, max) = iter.size_hint();
383            if let Some(max) = max {
384                if max <= Self::INLINE_MAX {
385                    return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
386                } else if max == min {
387                    let mut r = IpcBytes::new_mut_blocking(max)?;
388                    let mut actual_len = 0;
389                    for (i, b) in r.iter_mut().zip(iter) {
390                        *i = b;
391                        actual_len += 1;
392                    }
393                    r.truncate(actual_len);
394                    return r.finish_blocking();
395                }
396            }
397
398            let mut writer = Self::new_writer_blocking();
399            for b in iter {
400                writer.write_all(&[b])?;
401            }
402            writer.finish()
403        }
404        #[cfg(not(ipc))]
405        {
406            Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
407        }
408    }
409
410    /// Read `data` into shared memory.
411    pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
412        #[cfg(ipc)]
413        {
414            Self::from_read_blocking_ipc(data)
415        }
416        #[cfg(not(ipc))]
417        {
418            let mut buf = vec![];
419            data.read_to_end(&mut buf)?;
420            Self::from_vec_blocking(buf)
421        }
422    }
423    #[cfg(ipc)]
424    fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
425        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
426        let mut len = 0;
427
428        // INLINE_MAX read
429        loop {
430            match data.read(&mut buf[len..]) {
431                Ok(l) => {
432                    if l == 0 {
433                        // is <= INLINE_MAX
434                        buf.truncate(len);
435                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
436                    } else {
437                        len += l;
438                        if len == Self::INLINE_MAX + 1 {
439                            // goto UNNAMED_MAX read
440                            break;
441                        }
442                    }
443                }
444                Err(e) => match e.kind() {
445                    io::ErrorKind::WouldBlock => continue,
446                    _ => return Err(e),
447                },
448            }
449        }
450
451        // UNNAMED_MAX read
452        buf.resize(Self::UNNAMED_MAX + 1, 0);
453        loop {
454            match data.read(&mut buf[len..]) {
455                Ok(l) => {
456                    if l == 0 {
457                        // is <= UNNAMED_MAX
458                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
459                    } else {
460                        len += l;
461                        if len == Self::UNNAMED_MAX + 1 {
462                            // goto named file loop
463                            break;
464                        }
465                    }
466                }
467                Err(e) => match e.kind() {
468                    io::ErrorKind::WouldBlock => continue,
469                    _ => return Err(e),
470                },
471            }
472        }
473
474        // named file copy
475        Self::new_memmap_blocking(|m| {
476            m.write_all(&buf)?;
477            io::copy(data, m)?;
478            Ok(())
479        })
480    }
481
482    /// Read `path` into shared memory.
483    pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
484        let file = fs::File::open(path)?;
485        Self::from_file_blocking(file)
486    }
487    /// Read `file` into shared memory.
488    pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
489        #[cfg(ipc)]
490        {
491            let len = file.metadata()?.len();
492            if len <= Self::UNNAMED_MAX as u64 {
493                let mut buf = vec![0u8; len as usize];
494                file.read_exact(&mut buf)?;
495                Self::from_vec_blocking(buf)
496            } else {
497                Self::new_memmap_blocking(|m| {
498                    io::copy(&mut file, m)?;
499                    Ok(())
500                })
501            }
502        }
503        #[cfg(not(ipc))]
504        {
505            let mut buf = vec![];
506            file.read_to_end(&mut buf)?;
507            Self::from_vec_blocking(buf)
508        }
509    }
510
511    /// Create a memory mapped file.
512    ///
513    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
514    /// always selects the slowest options, a file backed memory map.
515    #[cfg(ipc)]
516    pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
517        let (name, mut file) = Self::create_memmap()?;
518        write(&mut file)?;
519        let mut permissions = file.metadata()?.permissions();
520        permissions.set_readonly(true);
521        #[cfg(unix)]
522        {
523            use std::os::unix::fs::PermissionsExt;
524            permissions.set_mode(0o400);
525        }
526        file.set_permissions(permissions)?;
527
528        drop(file);
529        let map = IpcMemMap::read(name, None)?;
530        Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
531    }
532    #[cfg(ipc)]
533    fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
534        static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
535        let mut count = MEMMAP_DIR.lock();
536
537        if *count == 0 {
538            // cleanup any leftover after crash
539            zng_env::on_process_exit(|_| {
540                // cleanup own resources and any leftover of other instances
541                cleanup_memmap_storage();
542            });
543        }
544
545        let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
546        fs::create_dir_all(&dir)?;
547        let mut name = dir.join(count.to_string());
548        if *count < usize::MAX {
549            *count += 1;
550        } else {
551            // very cold path, in practice the running process will die long before this
552            for i in 0..usize::MAX {
553                name = dir.join(i.to_string());
554                if !name.exists() {
555                    break;
556                }
557            }
558            if name.exists() {
559                return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
560            }
561        };
562
563        let mut opt = fs::OpenOptions::new();
564        opt.create(true).read(true).write(true).truncate(true);
565        #[cfg(windows)]
566        {
567            use std::os::windows::fs::OpenOptionsExt as _;
568            const FILE_ATTRIBUTE_TEMPORARY: u32 = 0x00000100;
569            opt.attributes(FILE_ATTRIBUTE_TEMPORARY);
570        }
571        let file = opt.open(&name)?;
572        Ok((name, file))
573    }
574
575    /// Memory map an existing file.
576    ///
577    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
578    ///
579    /// # Safety
580    ///
581    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
582    ///
583    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
584    /// so that the file data is as read-only as the static data in the current executable file.
585    ///
586    /// [`new_memmap`]: Self::new_memmap
587    #[cfg(ipc)]
588    pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
589        let read_handle = fs::File::open(&file)?;
590        read_handle.lock_shared()?;
591        let len = read_handle.metadata()?.len();
592        if let Some(range) = &range
593            && len < range.end as u64
594        {
595            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
596        }
597        // SAFETY: up to the caller.
598        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
599
600        let range = range.unwrap_or_else(|| 0..map.len());
601
602        Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
603            name: file,
604            range,
605            map: IpcMemMapData::Connected(map, read_handle),
606            is_custom: true,
607        }))))
608    }
609}
610
611impl IpcBytes {
612    /// Returns the number of bytes.
613    pub fn len(&self) -> usize {
614        match &*self.0 {
615            IpcBytesData::Heap(b) => b.len(),
616            #[cfg(ipc)]
617            IpcBytesData::AnonMemMap(b) => b.len(),
618            #[cfg(ipc)]
619            IpcBytesData::MemMap(b) => b.len(),
620            #[cfg(ipc)]
621            IpcBytesData::MemMapAsyncDeserialize { len, .. } => *len,
622        }
623    }
624
625    /// Returns `true` if contains no bytes.
626    pub fn is_empty(&self) -> bool {
627        match &*self.0 {
628            IpcBytesData::Heap(b) => b.is_empty(),
629            #[cfg(ipc)]
630            IpcBytesData::AnonMemMap(b) => b.is_empty(),
631            #[cfg(ipc)]
632            IpcBytesData::MemMap(b) => b.is_empty(),
633            #[cfg(ipc)]
634            IpcBytesData::MemMapAsyncDeserialize { len, .. } => *len == 0,
635        }
636    }
637}
638
639/// If built with `"ipc"` feature removes all leftover memmap files of crashed processes.
640///
641/// Note that this is already called on normal process exit if any memmap was created. It is also called
642/// by the crash handler process on startup after it spawns the app-process.
643pub fn cleanup_memmap_storage() {
644    #[cfg(ipc)]
645    if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
646        let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
647        for entry in entries {
648            if entry.is_dir() {
649                fs::remove_dir_all(entry).ok();
650            }
651        }
652    }
653}
654
655impl AsRef<[u8]> for IpcBytes {
656    fn as_ref(&self) -> &[u8] {
657        &self[..]
658    }
659}
660impl Default for IpcBytes {
661    fn default() -> Self {
662        Self::empty()
663    }
664}
665impl PartialEq for IpcBytes {
666    fn eq(&self, other: &Self) -> bool {
667        self.ptr_eq(other) || self[..] == other[..]
668    }
669}
670impl Eq for IpcBytes {}
671#[cfg(ipc)]
672struct IpcMemMap {
673    name: PathBuf,
674    range: ops::Range<usize>,
675    is_custom: bool,
676    map: IpcMemMapData,
677}
678#[cfg(ipc)]
679enum IpcMemMapData {
680    #[allow(unused)] // its holding the file handle
681    Connected(memmap2::Mmap, fs::File),
682    AsyncDeserializing,
683    AsyncDeserializeError(io::Error),
684    Dropped,
685}
686#[cfg(ipc)]
687impl ops::Deref for IpcMemMap {
688    type Target = [u8];
689
690    fn deref(&self) -> &[u8] {
691        match &self.map {
692            IpcMemMapData::Connected(mmap, _) => &mmap[self.range.clone()],
693            IpcMemMapData::AsyncDeserializeError(e) => panic!("IpcBytes failed to reconnect with deserialized memmap file, {e}"),
694            IpcMemMapData::AsyncDeserializing => unreachable!(), // only accessed from an OnceLock that locks until result is set
695            IpcMemMapData::Dropped => unreachable!(),
696        }
697    }
698}
699#[cfg(ipc)]
700impl IpcMemMap {
701    fn read_map(name: &Path) -> io::Result<(memmap2::Mmap, fs::File)> {
702        let read_handle = fs::File::open(name)?;
703        read_handle.lock_shared()?;
704        // SAFETY: File is marked read-only and a read lock is held for it.
705        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
706
707        Ok((map, read_handle))
708    }
709
710    fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
711        let r = Self::read_map(&name)?;
712        let range = range.unwrap_or_else(|| 0..r.0.len());
713        Ok(IpcMemMap {
714            name,
715            range,
716            is_custom: false,
717            map: IpcMemMapData::Connected(r.0, r.1),
718        })
719    }
720
721    fn finish_deserialize(&mut self) {
722        self.map = match Self::read_map(&self.name) {
723            Ok(r) => IpcMemMapData::Connected(r.0, r.1),
724            Err(e) => IpcMemMapData::AsyncDeserializeError(e),
725        };
726    }
727}
728#[cfg(ipc)]
729impl Serialize for IpcMemMap {
730    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
731    where
732        S: serde::Serializer,
733    {
734        (&self.name, self.range.clone()).serialize(serializer)
735    }
736}
737#[cfg(ipc)]
738impl<'de> Deserialize<'de> for IpcMemMap {
739    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
740    where
741        D: serde::Deserializer<'de>,
742    {
743        let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
744        Ok(IpcMemMap {
745            name,
746            range,
747            is_custom: false,
748            map: IpcMemMapData::AsyncDeserializing,
749        })
750    }
751}
752#[cfg(ipc)]
753impl Drop for IpcMemMap {
754    fn drop(&mut self) {
755        self.map = IpcMemMapData::Dropped;
756        if !self.is_custom {
757            std::fs::remove_file(&self.name).ok();
758        }
759    }
760}
761
762impl Serialize for IpcBytes {
763    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
764    where
765        S: serde::Serializer,
766    {
767        #[cfg(ipc)]
768        {
769            if is_ipc_serialization() {
770                match &*self.0 {
771                    IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
772                    IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
773                    IpcBytesData::MemMap(_) | IpcBytesData::MemMapAsyncDeserialize { .. } => {
774                        let b = match &*self.0 {
775                            IpcBytesData::MemMap(b) => b,
776                            IpcBytesData::MemMapAsyncDeserialize { map, .. } => map.wait(),
777                            _ => unreachable!(),
778                        };
779                        // need to keep alive until other process is also holding it, so we send
780                        // a sender for the other process to signal received.
781                        let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
782                            .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
783
784                        let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
785                        let hold = self.clone();
786                        blocking::unblock(move || {
787                            if let Err(e) = recv.recv_blocking() {
788                                tracing::error!("IpcBytes memmap completion signal not received, {e}")
789                            }
790                            drop(hold);
791                        })
792                        .detach();
793                        Ok(r)
794                    }
795                }
796            } else {
797                serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
798            }
799        }
800        #[cfg(not(ipc))]
801        {
802            serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
803        }
804    }
805}
806impl<'de> Deserialize<'de> for IpcBytes {
807    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
808    where
809        D: serde::Deserializer<'de>,
810    {
811        #[derive(Deserialize)]
812        enum VariantId {
813            Heap,
814            #[cfg(ipc)]
815            AnonMemMap,
816            #[cfg(ipc)]
817            MemMap,
818        }
819
820        struct EnumVisitor;
821        impl<'de> serde::de::Visitor<'de> for EnumVisitor {
822            type Value = IpcBytes;
823
824            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
825                write!(f, "IpcBytes variant")
826            }
827
828            fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
829            where
830                A: serde::de::EnumAccess<'de>,
831            {
832                let (variant, access) = data.variant::<VariantId>()?;
833                match variant {
834                    VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
835                    #[cfg(ipc)]
836                    VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
837                    #[cfg(ipc)]
838                    VariantId::MemMap => {
839                        let (mut memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
840
841                        let ipc_bytes = IpcBytes(Arc::new(IpcBytesData::MemMapAsyncDeserialize {
842                            map: std::sync::OnceLock::new(),
843                            len: memmap.range.len(),
844                        }));
845                        let ipc_bytes_sender = ipc_bytes.0.clone();
846                        blocking::unblock(move || {
847                            memmap.finish_deserialize();
848                            if let IpcMemMapData::AsyncDeserializeError(e) = &memmap.map {
849                                tracing::error!("failed to reconnect with deserialized memmap file, will panic on first read, {e}");
850                            }
851                            if let Err(e) = completion_sender.send_blocking(()) {
852                                tracing::error!("failed to send memmap deserialize completion signal, {e}");
853                            }
854                            match &*ipc_bytes_sender {
855                                IpcBytesData::MemMapAsyncDeserialize { map, .. } => {
856                                    map.get_or_init(|| memmap);
857                                }
858                                _ => unreachable!(),
859                            }
860                        })
861                        .detach();
862
863                        Ok(ipc_bytes)
864                    }
865                }
866            }
867        }
868        struct ByteSliceVisitor;
869        impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
870            type Value = IpcBytes;
871
872            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
873                write!(f, "byte buffer")
874            }
875
876            fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
877            where
878                E: serde::de::Error,
879            {
880                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
881            }
882
883            fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
884            where
885                E: serde::de::Error,
886            {
887                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
888            }
889
890            fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
891            where
892                E: serde::de::Error,
893            {
894                IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
895            }
896        }
897        impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
898            type Value = IpcBytes;
899
900            fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
901            where
902                D: serde::Deserializer<'de>,
903            {
904                deserializer.deserialize_bytes(ByteSliceVisitor)
905            }
906        }
907
908        #[cfg(ipc)]
909        {
910            deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
911        }
912        #[cfg(not(ipc))]
913        {
914            deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
915        }
916    }
917}
918
919/// Enables special serialization of memory mapped files for the `serialize` call.
920///
921/// IPC channels like [`IpcSender`] serialize messages inside this context to support [`IpcBytes`] fast memory map sharing across processes.
922///
923/// You can use the [`is_ipc_serialization`] to check if inside context.
924///
925/// [`IpcSender`]: super::IpcSender
926#[cfg(ipc)]
927pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
928    let parent = IPC_SERIALIZATION.replace(true);
929    let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
930    serialize()
931}
932
933/// Checks if is inside [`with_ipc_serialization`].
934#[cfg(ipc)]
935pub fn is_ipc_serialization() -> bool {
936    IPC_SERIALIZATION.get()
937}
938
939#[cfg(ipc)]
940thread_local! {
941    static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
942}
943
944impl IpcBytes {
945    /// Create a weak in process reference.
946    ///
947    /// Note that the weak reference cannot upgrade if only another process holds a strong reference.
948    pub fn downgrade(&self) -> WeakIpcBytes {
949        WeakIpcBytes(Arc::downgrade(&self.0))
950    }
951}
952
953/// Weak reference to an in process [`IpcBytes`].
954pub struct WeakIpcBytes(Weak<IpcBytesData>);
955impl WeakIpcBytes {
956    /// Get strong reference if any exists in the process.
957    pub fn upgrade(&self) -> Option<IpcBytes> {
958        self.0.upgrade().map(IpcBytes)
959    }
960
961    /// Count of strong references in the process.
962    pub fn strong_count(&self) -> usize {
963        self.0.strong_count()
964    }
965}
966
967/// Represents an async [`IpcBytes`] writer.
968///
969/// Use [`IpcBytes::new_writer`] to start writing.
970pub struct IpcBytesWriter {
971    inner: blocking::Unblock<IpcBytesWriterBlocking>,
972}
973impl IpcBytesWriter {
974    /// Finish writing and move data to a shareable [`IpcBytes`].
975    pub async fn finish(self) -> std::io::Result<IpcBytes> {
976        let inner = self.inner.into_inner().await;
977        blocking::unblock(move || inner.finish()).await
978    }
979
980    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
981    pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
982        let inner = self.inner.into_inner().await;
983        blocking::unblock(move || inner.finish_mut()).await
984    }
985}
986impl crate::io::AsyncWrite for IpcBytesWriter {
987    fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
988        crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
989    }
990
991    fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
992        crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
993    }
994
995    fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
996        crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
997    }
998}
999impl crate::io::AsyncSeek for IpcBytesWriter {
1000    fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
1001        crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
1002    }
1003}
1004
1005/// Represents a blocking [`IpcBytes`] writer.
1006///
1007/// Use [`IpcBytes::new_writer_blocking`] to start writing.
1008pub struct IpcBytesWriterBlocking {
1009    #[cfg(ipc)]
1010    heap_buf: Vec<u8>,
1011    #[cfg(ipc)]
1012    memmap: Option<(PathBuf, std::fs::File)>,
1013
1014    #[cfg(not(ipc))]
1015    heap_buf: std::io::Cursor<Vec<u8>>,
1016}
1017impl IpcBytesWriterBlocking {
1018    /// Finish writing and move data to a shareable [`IpcBytes`].
1019    pub fn finish(self) -> std::io::Result<IpcBytes> {
1020        let m = self.finish_mut()?;
1021        m.finish_blocking()
1022    }
1023
1024    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
1025    pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
1026        self.flush()?;
1027        #[cfg(ipc)]
1028        {
1029            let (len, inner) = match self.memmap {
1030                Some((name, write_handle)) => {
1031                    // SAFETY: we hold write lock
1032                    let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
1033                    let len = map.len();
1034                    (len, IpcBytesMutInner::MemMap { name, map, write_handle })
1035                }
1036                None => {
1037                    let len = self.heap_buf.len();
1038                    let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
1039                        IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
1040                    } else {
1041                        IpcBytesMutInner::Heap(self.heap_buf)
1042                    };
1043                    (len, i)
1044                }
1045            };
1046            Ok(IpcBytesMut { len, inner })
1047        }
1048        #[cfg(not(ipc))]
1049        {
1050            let heap_buf = self.heap_buf.into_inner();
1051            let len = heap_buf.len();
1052            let inner = IpcBytesMutInner::Heap(heap_buf);
1053            Ok(IpcBytesMut { len, inner })
1054        }
1055    }
1056
1057    #[cfg(ipc)]
1058    fn alloc_memmap_file(&mut self) -> io::Result<()> {
1059        if self.memmap.is_none() {
1060            let (name, file) = IpcBytes::create_memmap()?;
1061            file.lock()?;
1062            #[cfg(unix)]
1063            {
1064                let mut permissions = file.metadata()?.permissions();
1065                use std::os::unix::fs::PermissionsExt;
1066                permissions.set_mode(0o600);
1067                file.set_permissions(permissions)?;
1068            }
1069            self.memmap = Some((name, file));
1070        }
1071        let file = &mut self.memmap.as_mut().unwrap().1;
1072
1073        file.write_all(&self.heap_buf)?;
1074        // already allocated UNNAMED_MAX, continue using it as a large buffer
1075        self.heap_buf.clear();
1076        Ok(())
1077    }
1078}
1079impl std::io::Write for IpcBytesWriterBlocking {
1080    fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
1081        #[cfg(ipc)]
1082        {
1083            if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
1084                // write exceed heap buffer, convert to memmap or flush to existing memmap
1085                self.alloc_memmap_file()?;
1086
1087                if write_buf.len() > IpcBytes::UNNAMED_MAX {
1088                    // writing massive payload, skip buffer
1089                    self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
1090                } else {
1091                    self.heap_buf.extend_from_slice(write_buf);
1092                }
1093            } else {
1094                if self.memmap.is_none() {
1095                    // heap buffer not fully allocated yet, ensure we only allocate up to UNNAMED_MAX
1096                    self.heap_buf
1097                        .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
1098                }
1099                self.heap_buf.extend_from_slice(write_buf);
1100            }
1101
1102            Ok(write_buf.len())
1103        }
1104
1105        #[cfg(not(ipc))]
1106        {
1107            std::io::Write::write(&mut self.heap_buf, write_buf)
1108        }
1109    }
1110
1111    fn flush(&mut self) -> io::Result<()> {
1112        #[cfg(ipc)]
1113        if let Some((_, file)) = &mut self.memmap {
1114            if !self.heap_buf.is_empty() {
1115                file.write_all(&self.heap_buf)?;
1116                self.heap_buf.clear();
1117            }
1118            file.flush()?;
1119        }
1120        Ok(())
1121    }
1122}
1123impl std::io::Seek for IpcBytesWriterBlocking {
1124    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
1125        #[cfg(ipc)]
1126        {
1127            self.alloc_memmap_file()?;
1128            let (_, file) = self.memmap.as_mut().unwrap();
1129            if !self.heap_buf.is_empty() {
1130                file.write_all(&self.heap_buf)?;
1131                self.heap_buf.clear();
1132            }
1133            file.seek(pos)
1134        }
1135        #[cfg(not(ipc))]
1136        {
1137            std::io::Seek::seek(&mut self.heap_buf, pos)
1138        }
1139    }
1140}
1141
1142enum IpcBytesMutInner {
1143    Heap(Vec<u8>),
1144    #[cfg(ipc)]
1145    AnonMemMap(IpcSharedMemory),
1146    #[cfg(ipc)]
1147    MemMap {
1148        name: PathBuf,
1149        map: memmap2::MmapMut,
1150        write_handle: std::fs::File,
1151    },
1152}
1153
1154/// Represents preallocated exclusive mutable memory for a new [`IpcBytes`].
1155///
1156/// Use [`IpcBytes::new_mut`] or [`IpcBytes::new_mut_blocking`] to allocate.
1157pub struct IpcBytesMut {
1158    inner: IpcBytesMutInner,
1159    len: usize,
1160}
1161impl ops::Deref for IpcBytesMut {
1162    type Target = [u8];
1163
1164    fn deref(&self) -> &Self::Target {
1165        let len = self.len;
1166        match &self.inner {
1167            IpcBytesMutInner::Heap(v) => &v[..len],
1168            #[cfg(ipc)]
1169            IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1170            #[cfg(ipc)]
1171            IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1172        }
1173    }
1174}
1175impl ops::DerefMut for IpcBytesMut {
1176    fn deref_mut(&mut self) -> &mut Self::Target {
1177        let len = self.len;
1178        match &mut self.inner {
1179            IpcBytesMutInner::Heap(v) => &mut v[..len],
1180            #[cfg(ipc)]
1181            IpcBytesMutInner::AnonMemMap(m) => {
1182                // SAFETY: we are the only reference to the map
1183                unsafe { m.deref_mut() }
1184            }
1185            #[cfg(ipc)]
1186            IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1187        }
1188    }
1189}
1190impl fmt::Debug for IpcBytesMut {
1191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1192        write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1193    }
1194}
1195impl IpcBytesMut {
1196    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1197    pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
1198        #[cfg(ipc)]
1199        if len <= IpcBytes::INLINE_MAX {
1200            Ok(IpcBytesMut {
1201                len,
1202                inner: IpcBytesMutInner::Heap(vec![0; len]),
1203            })
1204        } else if len <= IpcBytes::UNNAMED_MAX {
1205            Ok(IpcBytesMut {
1206                len,
1207                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1208            })
1209        } else {
1210            blocking::unblock(move || Self::new_blocking(len)).await
1211        }
1212
1213        #[cfg(not(ipc))]
1214        {
1215            Ok(IpcBytesMut {
1216                len,
1217                inner: IpcBytesMutInner::Heap(vec![0; len]),
1218            })
1219        }
1220    }
1221
1222    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1223    pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
1224        #[cfg(ipc)]
1225        if len <= IpcBytes::INLINE_MAX {
1226            Ok(IpcBytesMut {
1227                len,
1228                inner: IpcBytesMutInner::Heap(vec![0; len]),
1229            })
1230        } else if len <= IpcBytes::UNNAMED_MAX {
1231            Ok(IpcBytesMut {
1232                len,
1233                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1234            })
1235        } else {
1236            let (name, file) = IpcBytes::create_memmap()?;
1237            file.lock()?;
1238            #[cfg(unix)]
1239            {
1240                let mut permissions = file.metadata()?.permissions();
1241                use std::os::unix::fs::PermissionsExt;
1242                permissions.set_mode(0o600);
1243                file.set_permissions(permissions)?;
1244            }
1245            file.set_len(len as u64)?;
1246            // SAFETY: we hold write lock
1247            let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
1248            Ok(IpcBytesMut {
1249                len,
1250                inner: IpcBytesMutInner::MemMap {
1251                    name,
1252                    map,
1253                    write_handle: file,
1254                },
1255            })
1256        }
1257        #[cfg(not(ipc))]
1258        {
1259            Ok(IpcBytesMut {
1260                len,
1261                inner: IpcBytesMutInner::Heap(vec![0; len]),
1262            })
1263        }
1264    }
1265
1266    /// Uses `buf` or copies it to exclusive mutable memory.
1267    pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1268        #[cfg(ipc)]
1269        if buf.len() <= IpcBytes::INLINE_MAX {
1270            Ok(Self {
1271                len: buf.len(),
1272                inner: IpcBytesMutInner::Heap(buf),
1273            })
1274        } else {
1275            blocking::unblock(move || {
1276                let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1277                b[..].copy_from_slice(&buf);
1278                Ok(b)
1279            })
1280            .await
1281        }
1282        #[cfg(not(ipc))]
1283        {
1284            Ok(Self {
1285                len: buf.len(),
1286                inner: IpcBytesMutInner::Heap(buf),
1287            })
1288        }
1289    }
1290
1291    /// Use or copy bytes to exclusive mutable memory.
1292    pub async fn from_bytes(bytes: IpcBytes) -> io::Result<Self> {
1293        blocking::unblock(move || Self::from_bytes_blocking(bytes)).await
1294    }
1295
1296    /// Convert to immutable shareable [`IpcBytes`].
1297    pub async fn finish(mut self) -> io::Result<IpcBytes> {
1298        let len = self.len;
1299        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1300            IpcBytesMutInner::Heap(mut v) => {
1301                v.truncate(len);
1302                v.shrink_to_fit();
1303                IpcBytesData::Heap(v)
1304            }
1305            #[cfg(ipc)]
1306            IpcBytesMutInner::AnonMemMap(m) => {
1307                if len < IpcBytes::INLINE_MAX {
1308                    IpcBytesData::Heap(m[..len].to_vec())
1309                } else if len < m.len() {
1310                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1311                } else {
1312                    IpcBytesData::AnonMemMap(m)
1313                }
1314            }
1315            #[cfg(ipc)]
1316            IpcBytesMutInner::MemMap { name, map, write_handle } => {
1317                let len = self.len;
1318                blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1319            }
1320        };
1321        Ok(IpcBytes(Arc::new(data)))
1322    }
1323
1324    #[cfg(ipc)]
1325    fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1326        let alloc_len = map.len();
1327        if alloc_len != len {
1328            write_handle.set_len(len as u64)?;
1329        }
1330        write_handle.unlock()?;
1331        let map = if alloc_len != len {
1332            drop(map);
1333            // SAFETY: we have write access to the file still
1334            unsafe { memmap2::Mmap::map(&write_handle) }?
1335        } else {
1336            map.make_read_only()?
1337        };
1338        let mut permissions = write_handle.metadata()?.permissions();
1339        permissions.set_readonly(true);
1340        #[cfg(unix)]
1341        {
1342            use std::os::unix::fs::PermissionsExt;
1343            permissions.set_mode(0o400);
1344        }
1345        write_handle.set_permissions(permissions)?;
1346        drop(write_handle);
1347        let read_handle = std::fs::File::open(&name)?;
1348        read_handle.lock_shared()?;
1349        Ok(IpcBytesData::MemMap(IpcMemMap {
1350            name,
1351            range: 0..len,
1352            is_custom: false,
1353            map: IpcMemMapData::Connected(map, read_handle),
1354        }))
1355    }
1356}
1357impl IpcBytesMut {
1358    /// Uses `buf` or copies it to exclusive mutable memory.
1359    pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1360        #[cfg(ipc)]
1361        if buf.len() <= IpcBytes::INLINE_MAX {
1362            Ok(Self {
1363                len: buf.len(),
1364                inner: IpcBytesMutInner::Heap(buf),
1365            })
1366        } else {
1367            let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1368            b[..].copy_from_slice(&buf);
1369            Ok(b)
1370        }
1371        #[cfg(not(ipc))]
1372        {
1373            Ok(Self {
1374                len: buf.len(),
1375                inner: IpcBytesMutInner::Heap(buf),
1376            })
1377        }
1378    }
1379
1380    /// Copy `buf` to exclusive mutable memory.
1381    pub fn from_slice_blocking(buf: &[u8]) -> io::Result<Self> {
1382        #[cfg(ipc)]
1383        if buf.len() <= IpcBytes::INLINE_MAX {
1384            Ok(Self {
1385                len: buf.len(),
1386                inner: IpcBytesMutInner::Heap(buf.to_vec()),
1387            })
1388        } else {
1389            let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1390            b[..].copy_from_slice(buf);
1391            Ok(b)
1392        }
1393        #[cfg(not(ipc))]
1394        {
1395            Ok(Self {
1396                len: buf.len(),
1397                inner: IpcBytesMutInner::Heap(buf.to_vec()),
1398            })
1399        }
1400    }
1401
1402    /// Use or copy `bytes` to exclusive mutable memory.
1403    pub fn from_bytes_blocking(bytes: IpcBytes) -> io::Result<Self> {
1404        #[cfg_attr(not(ipc), allow(irrefutable_let_patterns))]
1405        if let IpcBytesData::Heap(_) = &*bytes.0 {
1406            match Arc::try_unwrap(bytes.0) {
1407                Ok(r) => match r {
1408                    IpcBytesData::Heap(r) => Ok(Self {
1409                        len: r.len(),
1410                        inner: IpcBytesMutInner::Heap(r),
1411                    }),
1412                    _ => unreachable!(),
1413                },
1414                Err(a) => Self::from_slice_blocking(&IpcBytes(a)[..]),
1415            }
1416        } else {
1417            Self::from_slice_blocking(&bytes[..])
1418        }
1419    }
1420
1421    /// Convert to immutable shareable [`IpcBytes`].
1422    pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1423        let len = self.len;
1424        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1425            IpcBytesMutInner::Heap(mut v) => {
1426                v.truncate(len);
1427                IpcBytesData::Heap(v)
1428            }
1429            #[cfg(ipc)]
1430            IpcBytesMutInner::AnonMemMap(m) => {
1431                if len < IpcBytes::INLINE_MAX {
1432                    IpcBytesData::Heap(m[..len].to_vec())
1433                } else if len < m.len() {
1434                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1435                } else {
1436                    IpcBytesData::AnonMemMap(m)
1437                }
1438            }
1439            #[cfg(ipc)]
1440            IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1441        };
1442        Ok(IpcBytes(Arc::new(data)))
1443    }
1444}
1445#[cfg(ipc)]
1446impl Drop for IpcBytesMut {
1447    fn drop(&mut self) {
1448        if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1449            drop(map);
1450            drop(write_handle);
1451            std::fs::remove_file(name).ok();
1452        }
1453    }
1454}
1455
1456/// Safe bytemuck casting wrapper for [`IpcBytesMut`].
1457///
1458/// Use [`IpcBytesMut::cast`] to cast.
1459pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1460    bytes: IpcBytesMut,
1461    _t: PhantomData<T>,
1462}
1463impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1464    type Target = [T];
1465
1466    fn deref(&self) -> &Self::Target {
1467        bytemuck::cast_slice::<u8, T>(&self.bytes)
1468    }
1469}
1470impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1471    fn deref_mut(&mut self) -> &mut Self::Target {
1472        bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1473    }
1474}
1475impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1476    /// Convert back to [`IpcBytesMut`].
1477    pub fn into_inner(self) -> IpcBytesMut {
1478        self.bytes
1479    }
1480}
1481impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1482    fn from(value: IpcBytesMutCast<T>) -> Self {
1483        value.bytes
1484    }
1485}
1486impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
1487    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1488    pub async fn new(len: usize) -> io::Result<Self> {
1489        IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1490    }
1491
1492    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1493    pub fn new_blocking(len: usize) -> io::Result<Self> {
1494        IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1495    }
1496
1497    /// Uses `buf` or copies it to exclusive mutable memory.
1498    pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1499        IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
1500    }
1501
1502    /// Uses `buf` or copies it to exclusive mutable memory.
1503    pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1504        IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
1505    }
1506
1507    /// Copy data from slice.
1508    pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1509        IpcBytesMut::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytesMut::cast)
1510    }
1511
1512    /// Reference the underlying raw bytes.
1513    pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
1514        &mut self.bytes
1515    }
1516
1517    /// Convert to immutable shareable [`IpcBytesCast`].
1518    pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
1519        self.bytes.finish().await.map(IpcBytes::cast)
1520    }
1521
1522    /// Convert to immutable shareable [`IpcBytesCast`].
1523    pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
1524        self.bytes.finish_blocking().map(IpcBytes::cast)
1525    }
1526}
1527
1528impl IpcBytesMut {
1529    /// Safe bytemuck casting wrapper.
1530    ///
1531    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytesMust`.
1532    ///
1533    /// # Panics
1534    ///
1535    /// Panics if cannot cast, se [bytemuck docs] for details.
1536    ///
1537    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1538    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1539        let r = IpcBytesMutCast {
1540            bytes: self,
1541            _t: PhantomData,
1542        };
1543        let _assert = &r[..];
1544        r
1545    }
1546
1547    /// Safe bytemuck cast to slice.
1548    ///
1549    /// # Panics
1550    ///
1551    /// Panics if cannot cast, se [bytemuck docs] for details.
1552    ///
1553    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1554    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1555        bytemuck::cast_slice(self)
1556    }
1557
1558    /// Safe bytemuck cast to mutable slice.
1559    ///
1560    /// # Panics
1561    ///
1562    /// Panics if cannot cast, se [bytemuck docs] for details.
1563    ///
1564    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1565    pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1566        bytemuck::cast_slice_mut(self)
1567    }
1568}
1569
1570/// Safe bytemuck casting wrapper for [`IpcBytes`].
1571///
1572/// Use [`IpcBytes::cast`] to cast.
1573pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1574    bytes: IpcBytes,
1575    _t: PhantomData<T>,
1576}
1577impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
1578    fn default() -> Self {
1579        Self {
1580            bytes: Default::default(),
1581            _t: PhantomData,
1582        }
1583    }
1584}
1585impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1586    type Target = [T];
1587
1588    fn deref(&self) -> &Self::Target {
1589        bytemuck::cast_slice::<u8, T>(&self.bytes)
1590    }
1591}
1592impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1593    /// Convert back to [`IpcBytes`].
1594    pub fn into_inner(self) -> IpcBytes {
1595        self.bytes
1596    }
1597}
1598impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1599    fn from(value: IpcBytesCast<T>) -> Self {
1600        value.bytes
1601    }
1602}
1603impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
1604    fn clone(&self) -> Self {
1605        Self {
1606            bytes: self.bytes.clone(),
1607            _t: PhantomData,
1608        }
1609    }
1610}
1611impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
1612    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1613        write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
1614    }
1615}
1616impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
1617    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1618    where
1619        S: serde::Serializer,
1620    {
1621        self.bytes.serialize(serializer)
1622    }
1623}
1624impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
1625    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1626    where
1627        D: serde::Deserializer<'de>,
1628    {
1629        let bytes = IpcBytes::deserialize(deserializer)?;
1630        Ok(bytes.cast())
1631    }
1632}
1633impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
1634    fn eq(&self, other: &Self) -> bool {
1635        self.bytes == other.bytes
1636    }
1637}
1638impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
1639impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
1640    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytesCast` fast.
1641    pub async fn new_mut(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1642        IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1643    }
1644
1645    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
1646    pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1647        IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1648    }
1649
1650    /// Copy or move data from vector.
1651    pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1652        IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
1653    }
1654
1655    /// Copy data from the iterator.
1656    ///
1657    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
1658    /// will collect to an [`IpcBytesWriter`] that can reallocate multiple times as the buffer grows.
1659    ///
1660    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
1661    /// is used, if it continues after the given maximum it is clipped.
1662    ///
1663    /// [`size_hint`]: Iterator::size_hint
1664    pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
1665        #[cfg(ipc)]
1666        {
1667            let (min, max) = iter.size_hint();
1668            let l = size_of::<T>();
1669            let min = min * l;
1670            let max = max.map(|m| m * l);
1671            if let Some(max) = max {
1672                if max <= IpcBytes::INLINE_MAX {
1673                    return Self::from_vec(iter.collect()).await;
1674                } else if max == min {
1675                    let mut r = IpcBytes::new_mut(max).await?;
1676                    let mut actual_len = 0;
1677                    for (i, f) in r.chunks_exact_mut(l).zip(iter) {
1678                        i.copy_from_slice(bytemuck::bytes_of(&f));
1679                        actual_len += 1;
1680                    }
1681                    r.truncate(actual_len * l);
1682                    return r.finish().await.map(IpcBytes::cast);
1683                }
1684            }
1685
1686            let mut writer = IpcBytes::new_writer().await;
1687            for f in iter {
1688                writer.write_all(bytemuck::bytes_of(&f)).await?;
1689            }
1690            writer.finish().await.map(IpcBytes::cast)
1691        }
1692        #[cfg(not(ipc))]
1693        {
1694            Self::from_vec(iter.collect()).await
1695        }
1696    }
1697
1698    /// Copy or move data from vector.
1699    pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1700        IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
1701    }
1702
1703    /// Copy data from slice.
1704    pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1705        IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
1706    }
1707
1708    /// Copy data from the iterator.
1709    ///
1710    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
1711    /// will collect to an [`IpcBytesWriterBlocking`] that can reallocate multiple times as the buffer grows.
1712    ///
1713    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
1714    /// is used, if it continues after the given maximum it is clipped.
1715    ///
1716    /// [`size_hint`]: Iterator::size_hint
1717    pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
1718        #[cfg(ipc)]
1719        {
1720            let (min, max) = iter.size_hint();
1721            let l = size_of::<T>();
1722            let min = min * l;
1723            let max = max.map(|m| m * l);
1724            if let Some(max) = max {
1725                if max <= IpcBytes::INLINE_MAX {
1726                    return Self::from_vec_blocking(iter.collect());
1727                } else if max == min {
1728                    let mut r = IpcBytes::new_mut_blocking(max)?;
1729                    let mut actual_len = 0;
1730                    for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
1731                        i.copy_from_slice(bytemuck::bytes_of(&f));
1732                        actual_len += 1;
1733                    }
1734                    r.truncate(actual_len * l);
1735                    return r.finish_blocking().map(IpcBytes::cast);
1736                }
1737            }
1738
1739            let mut writer = IpcBytes::new_writer_blocking();
1740            for f in iter {
1741                writer.write_all(bytemuck::bytes_of(&f))?;
1742            }
1743            writer.finish().map(IpcBytes::cast)
1744        }
1745        #[cfg(not(ipc))]
1746        {
1747            Self::from_vec_blocking(iter.collect())
1748        }
1749    }
1750
1751    /// Reference the underlying raw bytes.
1752    pub fn as_bytes(&self) -> &IpcBytes {
1753        &self.bytes
1754    }
1755}
1756
1757impl IpcBytes {
1758    /// Safe bytemuck casting wrapper.
1759    ///
1760    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytes`.
1761    ///
1762    /// # Panics
1763    ///
1764    /// Panics if cannot cast, se [bytemuck docs] for details.
1765    ///
1766    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1767    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1768        let r = IpcBytesCast {
1769            bytes: self,
1770            _t: PhantomData,
1771        };
1772        let _assert = &r[..];
1773        r
1774    }
1775
1776    /// Safe bytemuck cast to slice.
1777    ///
1778    /// # Panics
1779    ///
1780    /// Panics if cannot cast, se [bytemuck docs] for details.
1781    ///
1782    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1783    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1784        bytemuck::cast_slice(self)
1785    }
1786}
1787
1788impl IpcBytesMut {
1789    /// Shorten the bytes length.
1790    ///
1791    /// If `new_len` is greater or equal to current length does nothing.
1792    ///
1793    /// Note that this does not affect memory allocation, the extra bytes are only dropped on finish.
1794    pub fn truncate(&mut self, new_len: usize) {
1795        self.len = self.len.min(new_len);
1796    }
1797
1798    /// Convert chunks of length `L0` to chunks of length `L1` where `L1 <= L0`.
1799    ///
1800    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1801    ///
1802    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1803    ///
1804    /// # Panics
1805    ///
1806    /// Panics if `L1 > L0` or if bytes length is not multiple of `L0`.
1807    pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1808        assert!(L1 <= L0);
1809
1810        let self_ = &mut self[..];
1811
1812        let len = self_.len();
1813        if len == 0 {
1814            return;
1815        }
1816        assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1817
1818        let ptr = self_.as_mut_ptr();
1819        let mut write = 0usize;
1820        let mut read = 0usize;
1821
1822        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1823        unsafe {
1824            while read < len {
1825                let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1826                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1827                read += L0;
1828
1829                let out_chunk = reduce(in_chunk.assume_init());
1830
1831                std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1832                write += L1;
1833            }
1834        }
1835
1836        self.truncate(write);
1837    }
1838
1839    /// Convert chunks of `in_chunk_len` to chunks of `out_chunk_buf.len()` where `out_chunk_buf.len() <= in_chunk_len`.
1840    ///
1841    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1842    ///
1843    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1844    ///
1845    /// # Panics
1846    ///
1847    /// Panics if `out_chunk_buf.len() > in_chunk_len` or if bytes length is not multiple of `in_chunk_len`.
1848    pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1849        assert!(out_chunk_buf.len() < in_chunk_len);
1850
1851        let self_ = &mut self[..];
1852
1853        let len = self_.len();
1854        if len == 0 {
1855            return;
1856        }
1857        assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1858
1859        let ptr = self_.as_mut_ptr();
1860        let mut write = 0usize;
1861        let mut read = 0usize;
1862
1863        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1864        unsafe {
1865            while read < len {
1866                reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1867                read += in_chunk_len;
1868
1869                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1870                write += out_chunk_buf.len();
1871            }
1872        }
1873
1874        self.truncate(write);
1875    }
1876
1877    /// Convert chunks of length `L0` to chunks of length `L1` where `size_of::<T1>() * L1 <= size_of::<T0>() * L0`.
1878    ///
1879    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1880    ///
1881    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1882    ///
1883    /// # Panics
1884    ///
1885    /// Panics if `size_of::<T1>() * L1 > size_of::<T0>() * L0` or if bytes length is not multiple of `size_of::<T0>() * L0`.
1886    pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1887    where
1888        T0: bytemuck::AnyBitPattern,
1889    {
1890        let l0 = std::mem::size_of::<T0>() * L0;
1891        let l1 = std::mem::size_of::<T1>() * L1;
1892        assert!(l1 <= l0);
1893
1894        let self_ = &mut self[..];
1895
1896        let len = self_.len();
1897        if len == 0 {
1898            return;
1899        }
1900        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1901
1902        let ptr = self_.as_mut_ptr();
1903        let mut write = 0usize;
1904        let mut read = 0usize;
1905
1906        // SAFETY:
1907        // Pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1908        // Reading [T0; L0] from raw bytes is safe because T0: AnyBitPattern
1909        unsafe {
1910            while read < len {
1911                let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1912                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1913                read += l0;
1914
1915                let out_chunk = reduce(in_chunk.assume_init());
1916
1917                std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1918                write += l1;
1919            }
1920        }
1921
1922        self.truncate(write);
1923    }
1924
1925    /// Convert chunks of `size_of::<T0>() * in_chunk_len` to chunks of `size_of::<T1>() * out_chunk_buf.len()`
1926    /// where `size_of::<T1>() * out_chunk_buf.len() <= size_of::<T0>() * in_chunk_len`.
1927    ///
1928    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1929    ///
1930    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1931    ///
1932    /// # Panics
1933    ///
1934    /// Panics if `size_of::<T1>() * out_chunk_buf.len() > size_of::<T0>() * in_chunk_len` or if bytes
1935    /// length is not multiple of `size_of::<T0>() * in_chunk_len`.
1936    pub fn cast_reduce_in_place_dyn<T0, T1>(
1937        &mut self,
1938        in_chunk_len: usize,
1939        out_chunk_buf: &mut [T1],
1940        mut reduce: impl FnMut(&[T0], &mut [T1]),
1941    ) where
1942        T0: bytemuck::AnyBitPattern,
1943    {
1944        let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1945        let l1 = std::mem::size_of_val(out_chunk_buf);
1946
1947        assert!(l1 <= l0);
1948
1949        let self_ = &mut self[..];
1950
1951        let len = self_.len();
1952        if len == 0 {
1953            return;
1954        }
1955        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1956
1957        let ptr = self_.as_mut_ptr();
1958        let mut write = 0usize;
1959        let mut read = 0usize;
1960
1961        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1962        unsafe {
1963            while read < len {
1964                reduce(
1965                    bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1966                    &mut *out_chunk_buf,
1967                );
1968                read += l0;
1969
1970                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1971                write += l1;
1972            }
1973        }
1974
1975        self.truncate(write);
1976    }
1977
1978    /// Reverses the order of chunks in the slice, in place.
1979    ///
1980    /// Chunk length is const L.
1981    ///
1982    /// # Panics
1983    ///
1984    /// Panics if length is not multiple of `L`.
1985    pub fn reverse_chunks<const L: usize>(&mut self) {
1986        let self_ = &mut self[..];
1987
1988        let len = self_.len();
1989
1990        if len == 0 || L == 0 {
1991            return;
1992        }
1993
1994        if L == 1 {
1995            return self_.reverse();
1996        }
1997
1998        assert!(len.is_multiple_of(L), "length must be multiple of L");
1999
2000        // SAFETY: already verified is multiple and already handled L=0
2001        unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
2002    }
2003
2004    /// Reverses the order of chunks in the slice, in place.
2005    ///
2006    /// # Panics
2007    ///
2008    /// Panics if length is not multiple of `chunk_len`.
2009    pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
2010        let self_ = &mut self[..];
2011
2012        let len = self_.len();
2013
2014        if len == 0 || chunk_len == 0 {
2015            return;
2016        }
2017
2018        if chunk_len == 1 {
2019            return self_.reverse();
2020        }
2021
2022        assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
2023
2024        let mut a = 0;
2025        let mut b = len - chunk_len;
2026
2027        let ptr = self_.as_mut_ptr();
2028
2029        // SAFETY: chunks are not overlapping and loop stops before at mid, chunk_len > 1
2030        unsafe {
2031            while a < b {
2032                std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
2033                a += chunk_len;
2034                b -= chunk_len;
2035            }
2036        }
2037    }
2038}
2039
2040// Slice iterator is very efficient, but it hold a reference, so we hold a self reference.
2041// The alternative to this is copying the unsafe code from std and adapting it or implementing
2042// a much slower index based iterator.
2043type SliceIter<'a> = std::slice::Iter<'a, u8>;
2044self_cell::self_cell! {
2045    struct IpcBytesIntoIterInner {
2046        owner: IpcBytes,
2047        #[covariant]
2048        dependent: SliceIter,
2049    }
2050}
2051
2052/// An [`IpcBytes`] iterator that holds a strong reference to it.
2053pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
2054impl IpcBytesIntoIter {
2055    fn new(bytes: IpcBytes) -> Self {
2056        Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
2057    }
2058
2059    /// The source bytes.
2060    pub fn source(&self) -> &IpcBytes {
2061        self.0.borrow_owner()
2062    }
2063
2064    /// Bytes not yet iterated.
2065    pub fn rest(&self) -> &[u8] {
2066        self.0.borrow_dependent().as_slice()
2067    }
2068}
2069impl Iterator for IpcBytesIntoIter {
2070    type Item = u8;
2071
2072    fn next(&mut self) -> Option<u8> {
2073        self.0.with_dependent_mut(|_, d| d.next().copied())
2074    }
2075
2076    fn size_hint(&self) -> (usize, Option<usize>) {
2077        self.0.borrow_dependent().size_hint()
2078    }
2079
2080    fn count(self) -> usize
2081    where
2082        Self: Sized,
2083    {
2084        self.0.borrow_dependent().as_slice().len()
2085    }
2086
2087    fn nth(&mut self, n: usize) -> Option<u8> {
2088        self.0.with_dependent_mut(|_, d| d.nth(n).copied())
2089    }
2090
2091    fn last(mut self) -> Option<Self::Item>
2092    where
2093        Self: Sized,
2094    {
2095        self.next_back()
2096    }
2097}
2098impl DoubleEndedIterator for IpcBytesIntoIter {
2099    fn next_back(&mut self) -> Option<Self::Item> {
2100        self.0.with_dependent_mut(|_, d| d.next_back().copied())
2101    }
2102
2103    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
2104        self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
2105    }
2106}
2107impl FusedIterator for IpcBytesIntoIter {}
2108impl Default for IpcBytesIntoIter {
2109    fn default() -> Self {
2110        IpcBytes::empty().into_iter()
2111    }
2112}
2113impl IntoIterator for IpcBytes {
2114    type Item = u8;
2115
2116    type IntoIter = IpcBytesIntoIter;
2117
2118    fn into_iter(self) -> Self::IntoIter {
2119        IpcBytesIntoIter::new(self)
2120    }
2121}
2122
2123/// An [`IpcBytesCast`] iterator that holds a strong reference to it.
2124pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
2125impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
2126    fn new(bytes: IpcBytesCast<T>) -> Self {
2127        Self(bytes.bytes.clone().into_iter(), bytes)
2128    }
2129
2130    /// The source bytes.
2131    pub fn source(&self) -> &IpcBytesCast<T> {
2132        &self.1
2133    }
2134
2135    /// Items not yet iterated.
2136    pub fn rest(&self) -> &[T] {
2137        bytemuck::cast_slice(self.0.rest())
2138    }
2139}
2140impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
2141    type Item = T;
2142
2143    fn next(&mut self) -> Option<T> {
2144        let size = size_of::<T>();
2145        let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
2146        self.0.nth(size - 1);
2147        Some(r)
2148    }
2149
2150    fn size_hint(&self) -> (usize, Option<usize>) {
2151        let (mut min, mut max) = self.0.size_hint();
2152        min /= size_of::<T>();
2153        if let Some(max) = &mut max {
2154            *max /= size_of::<T>();
2155        }
2156        (min, max)
2157    }
2158
2159    fn nth(&mut self, n: usize) -> Option<T> {
2160        let size = size_of::<T>();
2161
2162        let byte_skip = n.checked_mul(size)?;
2163        let byte_end = byte_skip.checked_add(size)?;
2164
2165        let bytes = self.0.rest().get(byte_skip..byte_end)?;
2166        let r = *bytemuck::from_bytes(bytes);
2167
2168        self.0.nth(byte_end - 1);
2169
2170        Some(r)
2171    }
2172
2173    fn last(mut self) -> Option<Self::Item>
2174    where
2175        Self: Sized,
2176    {
2177        self.next_back()
2178    }
2179}
2180impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
2181    fn next_back(&mut self) -> Option<T> {
2182        let size = size_of::<T>();
2183
2184        let len = self.0.rest().len();
2185        if len < size {
2186            return None;
2187        }
2188
2189        let start = len - size;
2190        let bytes = &self.0.rest()[start..];
2191        let r = *bytemuck::from_bytes(bytes);
2192
2193        self.0.nth_back(size - 1);
2194
2195        Some(r)
2196    }
2197
2198    fn nth_back(&mut self, n: usize) -> Option<T> {
2199        let size = size_of::<T>();
2200
2201        let rev_byte_skip = n.checked_mul(size)?;
2202        let rev_byte_end = rev_byte_skip.checked_add(size)?;
2203        let len = self.0.rest().len();
2204
2205        if len < rev_byte_end {
2206            return None;
2207        }
2208
2209        let start = len - rev_byte_end;
2210        let end = len - rev_byte_skip;
2211
2212        let bytes = &self.0.rest()[start..end];
2213        let r = *bytemuck::from_bytes(bytes);
2214
2215        self.0.nth_back(rev_byte_end - 1);
2216
2217        Some(r)
2218    }
2219}
2220impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
2221impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
2222    fn default() -> Self {
2223        IpcBytes::empty().cast::<T>().into_iter()
2224    }
2225}
2226impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
2227    type Item = T;
2228
2229    type IntoIter = IpcBytesCastIntoIter<T>;
2230
2231    fn into_iter(self) -> Self::IntoIter {
2232        IpcBytesCastIntoIter::new(self)
2233    }
2234}