zng_task/channel/
ipc_bytes.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    cell::Cell,
5    fmt, fs,
6    io::{self, Read, Write},
7    marker::PhantomData,
8    mem::MaybeUninit,
9    ops,
10    path::{Path, PathBuf},
11    pin::Pin,
12    sync::{Arc, Weak},
13};
14
15use futures_lite::AsyncReadExt;
16#[cfg(ipc)]
17use ipc_channel::ipc::IpcSharedMemory;
18use parking_lot::Mutex;
19use serde::{Deserialize, Serialize, de::VariantAccess};
20use zng_app_context::RunOnDrop;
21
22/// Immutable bytes vector that can be can be shared fast over IPC.
23///
24/// # Memory Storage
25///
26/// All storage backends are held by a [`Arc`] pointer, so cloning in process is always very cheap.
27///
28/// The `from_*` constructor functions use different storage depending on byte length. Bytes <= 64KB are allocated in the heap
29/// and are copied when shared with another process. Bytes <= 128MB are allocated in an anonymous memory map, only the system handle
30/// is copied when shared with another process. Bytes > 128MB are allocated in a temporary file with restricted access and memory mapped
31/// for read, only the file path and some metadata are copied when shared with another process.
32///
33/// Constructor functions for creating memory maps directly are also provided.
34///
35/// Note that in builds without the `"ipc"` crate feature only heap backend is available, in that case all data lengths are stored in the heap.
36///
37/// # Serialization
38///
39/// When serialized inside [`with_ipc_serialization`] the memory map bytes are not copied, only the system handle and metadata is serialized.
40/// When serialized in other contexts all bytes are copied.
41///
42/// When deserializing memory map handles are reconnected and if deserializing bytes selects the best storage based on data length.
43///
44/// [`IpcSender`]: super::IpcSender
45#[derive(Clone)]
46#[repr(transparent)]
47pub struct IpcBytes(Arc<IpcBytesData>);
48enum IpcBytesData {
49    Heap(Vec<u8>),
50    #[cfg(ipc)]
51    AnonMemMap(IpcSharedMemory),
52    #[cfg(ipc)]
53    MemMap(IpcMemMap),
54}
55#[cfg(ipc)]
56struct IpcMemMap {
57    name: PathBuf,
58    range: ops::Range<usize>,
59    is_custom: bool,
60    map: Option<memmap2::Mmap>,
61    read_handle: Option<fs::File>,
62}
63impl fmt::Debug for IpcBytes {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        write!(f, "IpcBytes(<{} bytes>)", self.len())
66    }
67}
68impl ops::Deref for IpcBytes {
69    type Target = [u8];
70
71    fn deref(&self) -> &Self::Target {
72        match &*self.0 {
73            IpcBytesData::Heap(i) => i,
74            #[cfg(ipc)]
75            IpcBytesData::AnonMemMap(m) => m,
76            #[cfg(ipc)]
77            IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
78        }
79    }
80}
81
82impl IpcBytes {
83    /// New empty.
84    pub fn empty() -> Self {
85        IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
86    }
87}
88/// Async constructors.
89impl IpcBytes {
90    /// Start a memory efficient async writer for creating a `IpcBytes` with unknown length.
91    pub async fn new_writer() -> IpcBytesWriter {
92        IpcBytesWriter {
93            inner: blocking::Unblock::new(Self::new_writer_blocking()),
94        }
95    }
96
97    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
98    pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
99        #[cfg(ipc)]
100        if len <= Self::INLINE_MAX {
101            Ok(IpcBytesMut {
102                len,
103                inner: IpcBytesMutInner::Heap(vec![0; len]),
104            })
105        } else if len <= Self::UNNAMED_MAX {
106            Ok(IpcBytesMut {
107                len,
108                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
109            })
110        } else {
111            blocking::unblock(move || Self::new_mut_blocking(len)).await
112        }
113
114        #[cfg(not(ipc))]
115        {
116            Ok(IpcBytesMut {
117                len,
118                inner: IpcBytesMutInner::Heap(vec![0; len]),
119            })
120        }
121    }
122
123    /// Copy or move data from vector.
124    pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
125        blocking::unblock(move || Self::from_vec_blocking(data)).await
126    }
127
128    /// Read `data` into shared memory.
129    pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
130        #[cfg(ipc)]
131        {
132            Self::from_read_ipc(data).await
133        }
134        #[cfg(not(ipc))]
135        {
136            let mut data = data;
137            let mut buf = vec![];
138            data.read_to_end(&mut buf).await;
139            Self::from_vec(buf).await
140        }
141    }
142    #[cfg(ipc)]
143    async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
144        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
145        let mut len = 0;
146
147        // INLINE_MAX read
148        loop {
149            match data.read(&mut buf[len..]).await {
150                Ok(l) => {
151                    if l == 0 {
152                        // is <= INLINE_MAX
153                        buf.truncate(len);
154                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
155                    } else {
156                        len += l;
157                        if len == Self::INLINE_MAX + 1 {
158                            // goto UNNAMED_MAX read
159                            break;
160                        }
161                    }
162                }
163                Err(e) => match e.kind() {
164                    io::ErrorKind::WouldBlock => continue,
165                    _ => return Err(e),
166                },
167            }
168        }
169
170        // UNNAMED_MAX read
171        buf.resize(Self::UNNAMED_MAX + 1, 0);
172        loop {
173            match data.read(&mut buf[len..]).await {
174                Ok(l) => {
175                    if l == 0 {
176                        // is <= UNNAMED_MAX
177                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
178                    } else {
179                        len += l;
180                        if len == Self::UNNAMED_MAX + 1 {
181                            // goto named file loop
182                            break;
183                        }
184                    }
185                }
186                Err(e) => match e.kind() {
187                    io::ErrorKind::WouldBlock => continue,
188                    _ => return Err(e),
189                },
190            }
191        }
192
193        // named file copy
194        Self::new_memmap(async |m| {
195            use futures_lite::AsyncWriteExt as _;
196
197            m.write_all(&buf).await?;
198            crate::io::copy(data, m).await?;
199            Ok(())
200        })
201        .await
202    }
203
204    /// Read `file` into shared memory.
205    pub async fn from_file(file: PathBuf) -> io::Result<Self> {
206        #[cfg(ipc)]
207        {
208            let mut file = crate::fs::File::open(file).await?;
209            let len = file.metadata().await?.len();
210            if len <= Self::UNNAMED_MAX as u64 {
211                let mut buf = vec![0u8; len as usize];
212                file.read_exact(&mut buf).await?;
213                Self::from_vec_blocking(buf)
214            } else {
215                Self::new_memmap(async move |m| {
216                    crate::io::copy(&mut file, m).await?;
217                    Ok(())
218                })
219                .await
220            }
221        }
222        #[cfg(not(ipc))]
223        {
224            let mut file = crate::fs::File::open(file).await?;
225            let mut buf = vec![];
226            file.read_to_end(&mut buf).await?;
227            Self::from_vec_blocking(buf)
228        }
229    }
230
231    /// Create a memory mapped file.
232    ///
233    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
234    /// always selects the slowest options, a file backed memory map.
235    #[cfg(ipc)]
236    pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
237        let (name, file) = blocking::unblock(Self::create_memmap).await?;
238        let mut file = crate::fs::File::from(file);
239        write(&mut file).await?;
240
241        let mut permissions = file.metadata().await?.permissions();
242        permissions.set_readonly(true);
243        #[cfg(unix)]
244        {
245            use std::os::unix::fs::PermissionsExt;
246            permissions.set_mode(0o400);
247        }
248        file.set_permissions(permissions).await?;
249
250        blocking::unblock(move || {
251            drop(file);
252            let map = IpcMemMap::read(name, None)?;
253            Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
254        })
255        .await
256    }
257
258    /// Memory map an existing file.
259    ///
260    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
261    ///
262    /// # Safety
263    ///
264    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
265    ///
266    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
267    /// so that the file data is as read-only as the static data in the current executable file.
268    ///
269    /// [`new_memmap`]: Self::new_memmap
270    #[cfg(ipc)]
271    pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
272        blocking::unblock(move || {
273            // SAFETY: up to the caller
274            unsafe { Self::open_memmap_blocking(file, range) }
275        })
276        .await
277    }
278
279    /// Gets if both point to the same memory.
280    pub fn ptr_eq(&self, other: &Self) -> bool {
281        let a = &self[..];
282        let b = &other[..];
283        (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
284    }
285
286    #[cfg(ipc)]
287    const INLINE_MAX: usize = 64 * 1024; // 64KB
288    #[cfg(ipc)]
289    const UNNAMED_MAX: usize = 128 * 1024 * 1024; // 128MB
290}
291
292/// Blocking constructors.
293impl IpcBytes {
294    /// Start a memory efficient blocking writer for creating a `IpcBytes` with unknown length.
295    pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
296        IpcBytesWriterBlocking {
297            #[cfg(ipc)]
298            heap_buf: vec![],
299            #[cfg(ipc)]
300            memmap: None,
301
302            #[cfg(not(ipc))]
303            heap_buf: std::io::Cursor::new(vec![]),
304        }
305    }
306
307    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
308    pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
309        #[cfg(ipc)]
310        if len <= Self::INLINE_MAX {
311            Ok(IpcBytesMut {
312                len,
313                inner: IpcBytesMutInner::Heap(vec![0; len]),
314            })
315        } else if len <= Self::UNNAMED_MAX {
316            Ok(IpcBytesMut {
317                len,
318                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
319            })
320        } else {
321            let (name, file) = Self::create_memmap()?;
322            file.lock()?;
323            #[cfg(unix)]
324            {
325                let mut permissions = file.metadata()?.permissions();
326                use std::os::unix::fs::PermissionsExt;
327                permissions.set_mode(0o600);
328                file.set_permissions(permissions)?;
329            }
330            file.set_len(len as u64)?;
331            // SAFETY: we hold write lock
332            let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
333            Ok(IpcBytesMut {
334                len,
335                inner: IpcBytesMutInner::MemMap {
336                    name,
337                    map,
338                    write_handle: file,
339                },
340            })
341        }
342        #[cfg(not(ipc))]
343        {
344            Ok(IpcBytesMut {
345                len,
346                inner: IpcBytesMutInner::Heap(vec![0; len]),
347            })
348        }
349    }
350
351    /// Copy data from slice.
352    pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
353        #[cfg(ipc)]
354        {
355            if data.len() <= Self::INLINE_MAX {
356                Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
357            } else if data.len() <= Self::UNNAMED_MAX {
358                Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
359            } else {
360                Self::new_memmap_blocking(|m| m.write_all(data))
361            }
362        }
363        #[cfg(not(ipc))]
364        {
365            Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
366        }
367    }
368
369    /// Copy or move data from vector.
370    pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
371        #[cfg(ipc)]
372        {
373            if data.len() <= Self::INLINE_MAX {
374                Ok(Self(Arc::new(IpcBytesData::Heap(data))))
375            } else {
376                Self::from_slice_blocking(&data)
377            }
378        }
379        #[cfg(not(ipc))]
380        {
381            Ok(Self(Arc::new(IpcBytesData::Heap(data))))
382        }
383    }
384
385    /// Read `data` into shared memory.
386    pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
387        #[cfg(ipc)]
388        {
389            Self::from_read_blocking_ipc(data)
390        }
391        #[cfg(not(ipc))]
392        {
393            let mut buf = vec![];
394            data.read_to_end(&mut buf)?;
395            Self::from_vec_blocking(buf)
396        }
397    }
398    #[cfg(ipc)]
399    fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
400        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
401        let mut len = 0;
402
403        // INLINE_MAX read
404        loop {
405            match data.read(&mut buf[len..]) {
406                Ok(l) => {
407                    if l == 0 {
408                        // is <= INLINE_MAX
409                        buf.truncate(len);
410                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
411                    } else {
412                        len += l;
413                        if len == Self::INLINE_MAX + 1 {
414                            // goto UNNAMED_MAX read
415                            break;
416                        }
417                    }
418                }
419                Err(e) => match e.kind() {
420                    io::ErrorKind::WouldBlock => continue,
421                    _ => return Err(e),
422                },
423            }
424        }
425
426        // UNNAMED_MAX read
427        buf.resize(Self::UNNAMED_MAX + 1, 0);
428        loop {
429            match data.read(&mut buf[len..]) {
430                Ok(l) => {
431                    if l == 0 {
432                        // is <= UNNAMED_MAX
433                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
434                    } else {
435                        len += l;
436                        if len == Self::UNNAMED_MAX + 1 {
437                            // goto named file loop
438                            break;
439                        }
440                    }
441                }
442                Err(e) => match e.kind() {
443                    io::ErrorKind::WouldBlock => continue,
444                    _ => return Err(e),
445                },
446            }
447        }
448
449        // named file copy
450        Self::new_memmap_blocking(|m| {
451            m.write_all(&buf)?;
452            io::copy(data, m)?;
453            Ok(())
454        })
455    }
456
457    /// Read `file` into shared memory.
458    pub fn from_file_blocking(file: &Path) -> io::Result<Self> {
459        #[cfg(ipc)]
460        {
461            let mut file = fs::File::open(file)?;
462            let len = file.metadata()?.len();
463            if len <= Self::UNNAMED_MAX as u64 {
464                let mut buf = vec![0u8; len as usize];
465                file.read_exact(&mut buf)?;
466                Self::from_vec_blocking(buf)
467            } else {
468                Self::new_memmap_blocking(|m| {
469                    io::copy(&mut file, m)?;
470                    Ok(())
471                })
472            }
473        }
474        #[cfg(not(ipc))]
475        {
476            let mut file = fs::File::open(file)?;
477            let mut buf = vec![];
478            file.read_to_end(&mut buf)?;
479            Self::from_vec_blocking(buf)
480        }
481    }
482
483    /// Create a memory mapped file.
484    ///
485    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
486    /// always selects the slowest options, a file backed memory map.
487    #[cfg(ipc)]
488    pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
489        let (name, mut file) = Self::create_memmap()?;
490        write(&mut file)?;
491        let mut permissions = file.metadata()?.permissions();
492        permissions.set_readonly(true);
493        #[cfg(unix)]
494        {
495            use std::os::unix::fs::PermissionsExt;
496            permissions.set_mode(0o400);
497        }
498        file.set_permissions(permissions)?;
499
500        drop(file);
501        let map = IpcMemMap::read(name, None)?;
502        Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
503    }
504    #[cfg(ipc)]
505    fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
506        static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
507        let mut count = MEMMAP_DIR.lock();
508
509        if *count == 0 {
510            zng_env::on_process_exit(|_| {
511                IpcBytes::cleanup_memmap_storage();
512            });
513        }
514
515        let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
516        fs::create_dir_all(&dir)?;
517        let mut name = dir.join(count.to_string());
518        if *count < usize::MAX {
519            *count += 1;
520        } else {
521            // very cold path, in practice the running process will die long before this
522            for i in 0..usize::MAX {
523                name = dir.join(i.to_string());
524                if !name.exists() {
525                    break;
526                }
527            }
528            if name.exists() {
529                return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
530            }
531        };
532
533        // read because some callers create a MmapMut
534        let file = fs::OpenOptions::new()
535            .create(true)
536            .read(true)
537            .write(true)
538            .truncate(true)
539            .open(&name)?;
540        Ok((name, file))
541    }
542    #[cfg(ipc)]
543    fn cleanup_memmap_storage() {
544        if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
545            let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
546            for entry in entries {
547                if entry.is_dir() {
548                    fs::remove_dir_all(entry).ok();
549                }
550            }
551        }
552    }
553
554    /// Memory map an existing file.
555    ///
556    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
557    ///
558    /// # Safety
559    ///
560    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
561    ///
562    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
563    /// so that the file data is as read-only as the static data in the current executable file.
564    ///
565    /// [`new_memmap`]: Self::new_memmap
566    #[cfg(ipc)]
567    pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
568        let read_handle = fs::File::open(&file)?;
569        read_handle.lock_shared()?;
570        let len = read_handle.metadata()?.len();
571        if let Some(range) = &range
572            && len < range.end as u64
573        {
574            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
575        }
576        // SAFETY: up to the caller.
577        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
578
579        let range = range.unwrap_or_else(|| 0..map.len());
580
581        Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
582            name: file,
583            range,
584            read_handle: Some(read_handle),
585            is_custom: true,
586            map: Some(map),
587        }))))
588    }
589}
590
591impl AsRef<[u8]> for IpcBytes {
592    fn as_ref(&self) -> &[u8] {
593        &self[..]
594    }
595}
596impl Default for IpcBytes {
597    fn default() -> Self {
598        Self::empty()
599    }
600}
601impl PartialEq for IpcBytes {
602    fn eq(&self, other: &Self) -> bool {
603        self.ptr_eq(other) || self[..] == other[..]
604    }
605}
606#[cfg(ipc)]
607impl IpcMemMap {
608    fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
609        let read_handle = fs::File::open(&name)?;
610        read_handle.lock_shared()?;
611        // SAFETY: File is marked read-only and a read lock is held for it.
612        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
613
614        let range = range.unwrap_or_else(|| 0..map.len());
615
616        Ok(IpcMemMap {
617            name,
618            range,
619            is_custom: false,
620            read_handle: Some(read_handle),
621            map: Some(map),
622        })
623    }
624}
625#[cfg(ipc)]
626impl Serialize for IpcMemMap {
627    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
628    where
629        S: serde::Serializer,
630    {
631        (&self.name, self.range.clone()).serialize(serializer)
632    }
633}
634#[cfg(ipc)]
635impl<'de> Deserialize<'de> for IpcMemMap {
636    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
637    where
638        D: serde::Deserializer<'de>,
639    {
640        let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
641        IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
642    }
643}
644#[cfg(ipc)]
645impl Drop for IpcMemMap {
646    fn drop(&mut self) {
647        self.map.take();
648        self.read_handle.take();
649        if !self.is_custom {
650            std::fs::remove_file(&self.name).ok();
651        }
652    }
653}
654
655impl Serialize for IpcBytes {
656    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
657    where
658        S: serde::Serializer,
659    {
660        #[cfg(ipc)]
661        {
662            if is_ipc_serialization() {
663                match &*self.0 {
664                    IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
665                    IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
666                    IpcBytesData::MemMap(b) => {
667                        // need to keep alive until other process is also holding it, so we send
668                        // a sender for the other process to signal received.
669                        let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
670                            .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
671
672                        let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
673                        let hold = self.clone();
674                        crate::spawn_wait(move || {
675                            if let Err(e) = recv.recv_blocking() {
676                                tracing::error!("IpcBytes memmap completion signal not received, {e}")
677                            }
678                            drop(hold);
679                        });
680                        Ok(r)
681                    }
682                }
683            } else {
684                serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
685            }
686        }
687        #[cfg(not(ipc))]
688        {
689            serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
690        }
691    }
692}
693impl<'de> Deserialize<'de> for IpcBytes {
694    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
695    where
696        D: serde::Deserializer<'de>,
697    {
698        #[derive(Deserialize)]
699        enum VariantId {
700            Heap,
701            #[cfg(ipc)]
702            AnonMemMap,
703            #[cfg(ipc)]
704            MemMap,
705        }
706
707        struct EnumVisitor;
708        impl<'de> serde::de::Visitor<'de> for EnumVisitor {
709            type Value = IpcBytes;
710
711            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
712                write!(f, "IpcBytes variant")
713            }
714
715            fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
716            where
717                A: serde::de::EnumAccess<'de>,
718            {
719                let (variant, access) = data.variant::<VariantId>()?;
720                match variant {
721                    VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
722                    #[cfg(ipc)]
723                    VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
724                    #[cfg(ipc)]
725                    VariantId::MemMap => {
726                        let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
727                        completion_sender.send_blocking(()).map_err(|e| {
728                            serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
729                        })?;
730                        Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
731                    }
732                }
733            }
734        }
735        struct ByteSliceVisitor;
736        impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
737            type Value = IpcBytes;
738
739            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
740                write!(f, "byte buffer")
741            }
742
743            fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
744            where
745                E: serde::de::Error,
746            {
747                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
748            }
749
750            fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
751            where
752                E: serde::de::Error,
753            {
754                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
755            }
756
757            fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
758            where
759                E: serde::de::Error,
760            {
761                IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
762            }
763        }
764        impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
765            type Value = IpcBytes;
766
767            fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
768            where
769                D: serde::Deserializer<'de>,
770            {
771                deserializer.deserialize_bytes(ByteSliceVisitor)
772            }
773        }
774
775        #[cfg(ipc)]
776        {
777            deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
778        }
779        #[cfg(not(ipc))]
780        {
781            deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
782        }
783    }
784}
785
786/// Enables special serialization of memory mapped files for the `serialize` call.
787///
788/// IPC channels like [`IpcSender`] serialize messages inside this context to support [`IpcBytes`] fast memory map sharing across processes.
789///
790/// You can use the [`is_ipc_serialization`] to check if inside context.
791///
792/// [`IpcSender`]: super::IpcSender
793#[cfg(ipc)]
794pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
795    let parent = IPC_SERIALIZATION.replace(true);
796    let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
797    serialize()
798}
799
800/// Checks if is inside [`with_ipc_serialization`].
801#[cfg(ipc)]
802pub fn is_ipc_serialization() -> bool {
803    IPC_SERIALIZATION.get()
804}
805
806#[cfg(ipc)]
807thread_local! {
808    static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
809}
810
811impl IpcBytes {
812    /// Create a weak in process reference.
813    ///
814    /// Note that the weak reference cannot upgrade if only another process holds a strong reference.
815    pub fn downgrade(&self) -> WeakIpcBytes {
816        WeakIpcBytes(Arc::downgrade(&self.0))
817    }
818}
819
820/// Weak reference to an in process [`IpcBytes`].
821pub struct WeakIpcBytes(Weak<IpcBytesData>);
822impl WeakIpcBytes {
823    /// Get strong reference if any exists in the process.
824    pub fn upgrade(&self) -> Option<IpcBytes> {
825        self.0.upgrade().map(IpcBytes)
826    }
827
828    /// Count of strong references in the process.
829    pub fn strong_count(&self) -> usize {
830        self.0.strong_count()
831    }
832}
833
834/// Represents an async [`IpcBytes`] writer.
835///
836/// Use [`IpcBytes::new_writer`] to start writing.
837pub struct IpcBytesWriter {
838    inner: blocking::Unblock<IpcBytesWriterBlocking>,
839}
840impl IpcBytesWriter {
841    /// Finish writing and move data to a shareable [`IpcBytes`].
842    pub async fn finish(self) -> std::io::Result<IpcBytes> {
843        let inner = self.inner.into_inner().await;
844        blocking::unblock(move || inner.finish()).await
845    }
846
847    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
848    pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
849        let inner = self.inner.into_inner().await;
850        blocking::unblock(move || inner.finish_mut()).await
851    }
852}
853impl crate::io::AsyncWrite for IpcBytesWriter {
854    fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
855        crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
856    }
857
858    fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
859        crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
860    }
861
862    fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
863        crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
864    }
865}
866impl crate::io::AsyncSeek for IpcBytesWriter {
867    fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
868        crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
869    }
870}
871
872/// Represents a blocking [`IpcBytes`] writer.
873///
874/// Use [`IpcBytes::new_writer_blocking`] to start writing.
875pub struct IpcBytesWriterBlocking {
876    #[cfg(ipc)]
877    heap_buf: Vec<u8>,
878    #[cfg(ipc)]
879    memmap: Option<(PathBuf, std::fs::File)>,
880
881    #[cfg(not(ipc))]
882    heap_buf: std::io::Cursor<Vec<u8>>,
883}
884impl IpcBytesWriterBlocking {
885    /// Finish writing and move data to a shareable [`IpcBytes`].
886    pub fn finish(self) -> std::io::Result<IpcBytes> {
887        let m = self.finish_mut()?;
888        m.finish_blocking()
889    }
890
891    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
892    pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
893        self.flush()?;
894        #[cfg(ipc)]
895        {
896            let (len, inner) = match self.memmap {
897                Some((name, write_handle)) => {
898                    // SAFETY: we hold write lock
899                    let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
900                    let len = map.len();
901                    (len, IpcBytesMutInner::MemMap { name, map, write_handle })
902                }
903                None => {
904                    let len = self.heap_buf.len();
905                    let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
906                        IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
907                    } else {
908                        IpcBytesMutInner::Heap(self.heap_buf)
909                    };
910                    (len, i)
911                }
912            };
913            Ok(IpcBytesMut { len, inner })
914        }
915        #[cfg(not(ipc))]
916        {
917            let heap_buf = self.heap_buf.into_inner();
918            let len = heap_buf.len();
919            let inner = IpcBytesMutInner::Heap(heap_buf);
920            Ok(IpcBytesMut { len, inner })
921        }
922    }
923
924    #[cfg(ipc)]
925    fn alloc_memmap_file(&mut self) -> io::Result<()> {
926        if self.memmap.is_none() {
927            let (name, file) = IpcBytes::create_memmap()?;
928            file.lock()?;
929            #[cfg(unix)]
930            {
931                let mut permissions = file.metadata()?.permissions();
932                use std::os::unix::fs::PermissionsExt;
933                permissions.set_mode(0o600);
934                file.set_permissions(permissions)?;
935            }
936            self.memmap = Some((name, file));
937        }
938        let file = &mut self.memmap.as_mut().unwrap().1;
939
940        file.write_all(&self.heap_buf)?;
941        // already allocated UNNAMED_MAX, continue using it as a large buffer
942        self.heap_buf.clear();
943        Ok(())
944    }
945}
946impl std::io::Write for IpcBytesWriterBlocking {
947    fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
948        #[cfg(ipc)]
949        {
950            if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
951                // write exceed heap buffer, convert to memmap or flush to existing memmap
952                self.alloc_memmap_file()?;
953
954                if write_buf.len() > IpcBytes::UNNAMED_MAX {
955                    // writing massive payload, skip buffer
956                    self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
957                } else {
958                    self.heap_buf.extend_from_slice(write_buf);
959                }
960            } else {
961                if self.memmap.is_none() {
962                    // heap buffer not fully allocated yet, ensure we only allocate up to UNNAMED_MAX
963                    self.heap_buf
964                        .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
965                }
966                self.heap_buf.extend_from_slice(write_buf);
967            }
968
969            Ok(write_buf.len())
970        }
971
972        #[cfg(not(ipc))]
973        {
974            std::io::Write::write(&mut self.heap_buf, write_buf)
975        }
976    }
977
978    fn flush(&mut self) -> io::Result<()> {
979        #[cfg(ipc)]
980        if let Some((_, file)) = &mut self.memmap {
981            if !self.heap_buf.is_empty() {
982                file.write_all(&self.heap_buf)?;
983                self.heap_buf.clear();
984            }
985            file.flush()?;
986        }
987        Ok(())
988    }
989}
990impl std::io::Seek for IpcBytesWriterBlocking {
991    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
992        #[cfg(ipc)]
993        {
994            self.alloc_memmap_file()?;
995            let (_, file) = self.memmap.as_mut().unwrap();
996            if !self.heap_buf.is_empty() {
997                file.write_all(&self.heap_buf)?;
998                self.heap_buf.clear();
999            }
1000            file.seek(pos)
1001        }
1002        #[cfg(not(ipc))]
1003        {
1004            std::io::Seek::seek(&mut self.heap_buf, pos)
1005        }
1006    }
1007}
1008
1009enum IpcBytesMutInner {
1010    Heap(Vec<u8>),
1011    #[cfg(ipc)]
1012    AnonMemMap(IpcSharedMemory),
1013    #[cfg(ipc)]
1014    MemMap {
1015        name: PathBuf,
1016        map: memmap2::MmapMut,
1017        write_handle: std::fs::File,
1018    },
1019}
1020
1021/// Represents preallocated exclusive mutable memory for a new [`IpcBytes`].
1022///
1023/// Use [`IpcBytes::new_mut`] or [`IpcBytes::new_mut_blocking`] to allocate.
1024pub struct IpcBytesMut {
1025    inner: IpcBytesMutInner,
1026    len: usize,
1027}
1028impl ops::Deref for IpcBytesMut {
1029    type Target = [u8];
1030
1031    fn deref(&self) -> &Self::Target {
1032        let len = self.len;
1033        match &self.inner {
1034            IpcBytesMutInner::Heap(v) => &v[..len],
1035            #[cfg(ipc)]
1036            IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1037            #[cfg(ipc)]
1038            IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1039        }
1040    }
1041}
1042impl ops::DerefMut for IpcBytesMut {
1043    fn deref_mut(&mut self) -> &mut Self::Target {
1044        let len = self.len;
1045        match &mut self.inner {
1046            IpcBytesMutInner::Heap(v) => &mut v[..len],
1047            #[cfg(ipc)]
1048            IpcBytesMutInner::AnonMemMap(m) => {
1049                // SAFETY: we are the only reference to the map
1050                unsafe { m.deref_mut() }
1051            }
1052            #[cfg(ipc)]
1053            IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1054        }
1055    }
1056}
1057impl fmt::Debug for IpcBytesMut {
1058    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1059        write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1060    }
1061}
1062impl IpcBytesMut {
1063    /// Uses `buf` or copies it to exclusive mutable memory.
1064    pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1065        #[cfg(ipc)]
1066        if buf.len() <= IpcBytes::INLINE_MAX {
1067            Ok(Self {
1068                len: buf.len(),
1069                inner: IpcBytesMutInner::Heap(buf),
1070            })
1071        } else {
1072            blocking::unblock(move || {
1073                let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1074                b[..].copy_from_slice(&buf);
1075                Ok(b)
1076            })
1077            .await
1078        }
1079        #[cfg(not(ipc))]
1080        {
1081            Ok(Self {
1082                len: buf.len(),
1083                inner: IpcBytesMutInner::Heap(buf),
1084            })
1085        }
1086    }
1087
1088    /// Convert to immutable shareable [`IpcBytes`].
1089    pub async fn finish(mut self) -> io::Result<IpcBytes> {
1090        let len = self.len;
1091        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1092            IpcBytesMutInner::Heap(mut v) => {
1093                v.truncate(len);
1094                v.shrink_to_fit();
1095                IpcBytesData::Heap(v)
1096            }
1097            #[cfg(ipc)]
1098            IpcBytesMutInner::AnonMemMap(m) => {
1099                if len < IpcBytes::INLINE_MAX {
1100                    IpcBytesData::Heap(m[..len].to_vec())
1101                } else if len < m.len() {
1102                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1103                } else {
1104                    IpcBytesData::AnonMemMap(m)
1105                }
1106            }
1107            #[cfg(ipc)]
1108            IpcBytesMutInner::MemMap { name, map, write_handle } => {
1109                let len = self.len;
1110                blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1111            }
1112        };
1113        Ok(IpcBytes(Arc::new(data)))
1114    }
1115
1116    #[cfg(ipc)]
1117    fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1118        let alloc_len = map.len();
1119        if alloc_len != len {
1120            write_handle.set_len(len as u64)?;
1121        }
1122        write_handle.unlock()?;
1123        let map = if alloc_len != len {
1124            drop(map);
1125            // SAFETY: we have write access to the file still
1126            unsafe { memmap2::Mmap::map(&write_handle) }?
1127        } else {
1128            map.make_read_only()?
1129        };
1130        let mut permissions = write_handle.metadata()?.permissions();
1131        permissions.set_readonly(true);
1132        #[cfg(unix)]
1133        {
1134            use std::os::unix::fs::PermissionsExt;
1135            permissions.set_mode(0o400);
1136        }
1137        write_handle.set_permissions(permissions)?;
1138        drop(write_handle);
1139        let read_handle = std::fs::File::open(&name)?;
1140        read_handle.lock_shared()?;
1141        Ok(IpcBytesData::MemMap(IpcMemMap {
1142            name,
1143            range: 0..len,
1144            is_custom: false,
1145            map: Some(map),
1146            read_handle: Some(read_handle),
1147        }))
1148    }
1149}
1150impl IpcBytesMut {
1151    /// Uses `buf` or copies it to exclusive mutable memory.
1152    pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1153        #[cfg(ipc)]
1154        if buf.len() <= IpcBytes::INLINE_MAX {
1155            Ok(Self {
1156                len: buf.len(),
1157                inner: IpcBytesMutInner::Heap(buf),
1158            })
1159        } else {
1160            let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1161            b[..].copy_from_slice(&buf);
1162            Ok(b)
1163        }
1164        #[cfg(not(ipc))]
1165        {
1166            Ok(Self {
1167                len: buf.len(),
1168                inner: IpcBytesMutInner::Heap(buf),
1169            })
1170        }
1171    }
1172
1173    /// Convert to immutable shareable [`IpcBytes`].
1174    pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1175        let len = self.len;
1176        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1177            IpcBytesMutInner::Heap(mut v) => {
1178                v.truncate(len);
1179                IpcBytesData::Heap(v)
1180            }
1181            #[cfg(ipc)]
1182            IpcBytesMutInner::AnonMemMap(m) => {
1183                if len < IpcBytes::INLINE_MAX {
1184                    IpcBytesData::Heap(m[..len].to_vec())
1185                } else if len < m.len() {
1186                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1187                } else {
1188                    IpcBytesData::AnonMemMap(m)
1189                }
1190            }
1191            #[cfg(ipc)]
1192            IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1193        };
1194        Ok(IpcBytes(Arc::new(data)))
1195    }
1196}
1197#[cfg(ipc)]
1198impl Drop for IpcBytesMut {
1199    fn drop(&mut self) {
1200        if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1201            drop(map);
1202            drop(write_handle);
1203            std::fs::remove_file(name).ok();
1204        }
1205    }
1206}
1207
1208/// Safe bytemuck casting wrapper for [`IpcBytesMut`].
1209///
1210/// Use [`IpcBytesMut::cast`] to cast.
1211pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1212    bytes: IpcBytesMut,
1213    _t: PhantomData<T>,
1214}
1215impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1216    type Target = [T];
1217
1218    fn deref(&self) -> &Self::Target {
1219        bytemuck::cast_slice::<u8, T>(&self.bytes)
1220    }
1221}
1222impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1223    fn deref_mut(&mut self) -> &mut Self::Target {
1224        bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1225    }
1226}
1227impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1228    /// Convert back to [`IpcBytesMut`].
1229    pub fn into_inner(self) -> IpcBytesMut {
1230        self.bytes
1231    }
1232}
1233impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1234    fn from(value: IpcBytesMutCast<T>) -> Self {
1235        value.bytes
1236    }
1237}
1238impl IpcBytesMut {
1239    /// Safe bytemuck casting wrapper.
1240    ///
1241    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytesMust`.
1242    ///
1243    /// # Panics
1244    ///
1245    /// Panics if cannot cast, se [bytemuck docs] for details.
1246    ///
1247    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1248    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1249        let r = IpcBytesMutCast {
1250            bytes: self,
1251            _t: PhantomData,
1252        };
1253        let _assert = &r[..];
1254        r
1255    }
1256
1257    /// Safe bytemuck cast to slice.
1258    ///
1259    /// # Panics
1260    ///
1261    /// Panics if cannot cast, se [bytemuck docs] for details.
1262    ///
1263    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1264    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1265        bytemuck::cast_slice(self)
1266    }
1267
1268    /// Safe bytemuck cast to mutable slice.
1269    ///
1270    /// # Panics
1271    ///
1272    /// Panics if cannot cast, se [bytemuck docs] for details.
1273    ///
1274    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1275    pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1276        bytemuck::cast_slice_mut(self)
1277    }
1278}
1279
1280/// Safe bytemuck casting wrapper for [`IpcBytes`].
1281///
1282/// Use [`IpcBytes::cast`] to cast.
1283pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1284    bytes: IpcBytes,
1285    _t: PhantomData<T>,
1286}
1287impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1288    type Target = [T];
1289
1290    fn deref(&self) -> &Self::Target {
1291        bytemuck::cast_slice::<u8, T>(&self.bytes)
1292    }
1293}
1294impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1295    /// Convert back to [`IpcBytes`].
1296    pub fn into_inner(self) -> IpcBytes {
1297        self.bytes
1298    }
1299}
1300impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1301    fn from(value: IpcBytesCast<T>) -> Self {
1302        value.bytes
1303    }
1304}
1305impl IpcBytes {
1306    /// Safe bytemuck casting wrapper.
1307    ///
1308    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytes`.
1309    ///
1310    /// # Panics
1311    ///
1312    /// Panics if cannot cast, se [bytemuck docs] for details.
1313    ///
1314    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1315    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1316        let r = IpcBytesCast {
1317            bytes: self,
1318            _t: PhantomData,
1319        };
1320        let _assert = &r[..];
1321        r
1322    }
1323
1324    /// Safe bytemuck cast to slice.
1325    ///
1326    /// # Panics
1327    ///
1328    /// Panics if cannot cast, se [bytemuck docs] for details.
1329    ///
1330    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
1331    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1332        bytemuck::cast_slice(self)
1333    }
1334}
1335
1336impl IpcBytesMut {
1337    /// Shorten the bytes length.
1338    ///
1339    /// If `new_len` is greater or equal to current length does nothing.
1340    ///
1341    /// Note that this does not affect memory allocation, the extra bytes are only dropped on finish.
1342    pub fn truncate(&mut self, new_len: usize) {
1343        self.len = self.len.min(new_len);
1344    }
1345
1346    /// Convert chunks of length `L0` to chunks of length `L1` where `L1 <= L0`.
1347    ///
1348    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1349    ///
1350    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1351    ///
1352    /// # Panics
1353    ///
1354    /// Panics if `L1 > L0` or if bytes length is not multiple of `L0`.
1355    pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1356        assert!(L1 <= L0);
1357
1358        let self_ = &mut self[..];
1359
1360        let len = self_.len();
1361        if len == 0 {
1362            return;
1363        }
1364        assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1365
1366        let ptr = self_.as_mut_ptr();
1367        let mut write = 0usize;
1368        let mut read = 0usize;
1369
1370        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1371        unsafe {
1372            while read < len {
1373                let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1374                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1375                read += L0;
1376
1377                let out_chunk = reduce(in_chunk.assume_init());
1378
1379                std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1380                write += L1;
1381            }
1382        }
1383
1384        self.truncate(write);
1385    }
1386
1387    /// Convert chunks of `in_chunk_len` to chunks of `out_chunk_buf.len()` where `out_chunk_buf.len() <= in_chunk_len`.
1388    ///
1389    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1390    ///
1391    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1392    ///
1393    /// # Panics
1394    ///
1395    /// Panics if `out_chunk_buf.len() > in_chunk_len` or if bytes length is not multiple of `in_chunk_len`.
1396    pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1397        assert!(out_chunk_buf.len() < in_chunk_len);
1398
1399        let self_ = &mut self[..];
1400
1401        let len = self_.len();
1402        if len == 0 {
1403            return;
1404        }
1405        assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1406
1407        let ptr = self_.as_mut_ptr();
1408        let mut write = 0usize;
1409        let mut read = 0usize;
1410
1411        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1412        unsafe {
1413            while read < len {
1414                reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1415                read += in_chunk_len;
1416
1417                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1418                write += out_chunk_buf.len();
1419            }
1420        }
1421
1422        self.truncate(write);
1423    }
1424
1425    /// Convert chunks of length `L0` to chunks of length `L1` where `size_of::<T1>() * L1 <= size_of::<T0>() * L0`.
1426    ///
1427    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1428    ///
1429    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1430    ///
1431    /// # Panics
1432    ///
1433    /// Panics if `size_of::<T1>() * L1 > size_of::<T0>() * L0` or if bytes length is not multiple of `size_of::<T0>() * L0`.
1434    pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1435    where
1436        T0: bytemuck::AnyBitPattern,
1437    {
1438        let l0 = std::mem::size_of::<T0>() * L0;
1439        let l1 = std::mem::size_of::<T1>() * L1;
1440        assert!(l1 <= l0);
1441
1442        let self_ = &mut self[..];
1443
1444        let len = self_.len();
1445        if len == 0 {
1446            return;
1447        }
1448        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1449
1450        let ptr = self_.as_mut_ptr();
1451        let mut write = 0usize;
1452        let mut read = 0usize;
1453
1454        // SAFETY:
1455        // Pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1456        // Reading [T0; L0] from raw bytes is safe because T0: AnyBitPattern
1457        unsafe {
1458            while read < len {
1459                let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1460                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1461                read += l0;
1462
1463                let out_chunk = reduce(in_chunk.assume_init());
1464
1465                std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1466                write += l1;
1467            }
1468        }
1469
1470        self.truncate(write);
1471    }
1472
1473    /// Convert chunks of `size_of::<T0>() * in_chunk_len` to chunks of `size_of::<T1>() * out_chunk_buf.len()`
1474    /// where `size_of::<T1>() * out_chunk_buf.len() <= size_of::<T0>() * in_chunk_len`.
1475    ///
1476    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
1477    ///
1478    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
1479    ///
1480    /// # Panics
1481    ///
1482    /// Panics if `size_of::<T1>() * out_chunk_buf.len() > size_of::<T0>() * in_chunk_len` or if bytes
1483    /// length is not multiple of `size_of::<T0>() * in_chunk_len`.
1484    pub fn cast_reduce_in_place_dyn<T0, T1>(
1485        &mut self,
1486        in_chunk_len: usize,
1487        out_chunk_buf: &mut [T1],
1488        mut reduce: impl FnMut(&[T0], &mut [T1]),
1489    ) where
1490        T0: bytemuck::AnyBitPattern,
1491    {
1492        let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1493        let l1 = std::mem::size_of_val(out_chunk_buf);
1494
1495        assert!(l1 <= l0);
1496
1497        let self_ = &mut self[..];
1498
1499        let len = self_.len();
1500        if len == 0 {
1501            return;
1502        }
1503        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1504
1505        let ptr = self_.as_mut_ptr();
1506        let mut write = 0usize;
1507        let mut read = 0usize;
1508
1509        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
1510        unsafe {
1511            while read < len {
1512                reduce(
1513                    bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1514                    &mut *out_chunk_buf,
1515                );
1516                read += l0;
1517
1518                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1519                write += l1;
1520            }
1521        }
1522
1523        self.truncate(write);
1524    }
1525
1526    /// Reverses the order of chunks in the slice, in place.
1527    ///
1528    /// Chunk length is const L.
1529    ///
1530    /// # Panics
1531    ///
1532    /// Panics if length is not multiple of `L`.
1533    pub fn reverse_chunks<const L: usize>(&mut self) {
1534        let self_ = &mut self[..];
1535
1536        let len = self_.len();
1537
1538        if len == 0 || L == 0 {
1539            return;
1540        }
1541
1542        if L == 1 {
1543            return self_.reverse();
1544        }
1545
1546        assert!(len.is_multiple_of(L), "length must be multiple of L");
1547
1548        // SAFETY: already verified is multiple and already handled L=0
1549        unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
1550    }
1551
1552    /// Reverses the order of chunks in the slice, in place.
1553    ///
1554    /// # Panics
1555    ///
1556    /// Panics if length is not multiple of `chunk_len`.
1557    pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
1558        let self_ = &mut self[..];
1559
1560        let len = self_.len();
1561
1562        if len == 0 || chunk_len == 0 {
1563            return;
1564        }
1565
1566        if chunk_len == 1 {
1567            return self_.reverse();
1568        }
1569
1570        assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
1571
1572        let mut a = 0;
1573        let mut b = len - chunk_len;
1574
1575        let ptr = self_.as_mut_ptr();
1576
1577        // SAFETY: chunks are not overlapping and loop stops before at mid, chunk_len > 1
1578        unsafe {
1579            while a < b {
1580                std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
1581                a += chunk_len;
1582                b -= chunk_len;
1583            }
1584        }
1585    }
1586}