zng_task/channel/
ipc_bytes.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    cell::Cell,
5    fmt, fs,
6    io::{self, Read, Write},
7    ops,
8    path::{Path, PathBuf},
9    pin::Pin,
10    sync::{Arc, Weak},
11};
12
13use futures_lite::AsyncReadExt;
14#[cfg(ipc)]
15use ipc_channel::ipc::IpcSharedMemory;
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize, de::VariantAccess};
18use zng_app_context::RunOnDrop;
19
20/// Immutable bytes vector that can be can be shared fast over IPC.
21///
22/// # Memory Storage
23///
24/// All storage backends are held by a [`Arc`] pointer, so cloning in process is always very cheap.
25///
26/// The `from_*` constructor functions use different storage depending on byte length. Bytes <= 64KB are allocated in the heap
27/// and are copied when shared with another process. Bytes <= 128MB are allocated in an anonymous memory map, only the system handle
28/// is copied when shared with another process. Bytes > 128MB are allocated in a temporary file with restricted access and memory mapped
29/// for read, only the file path and some metadata are copied when shared with another process.
30///
31/// Constructor functions for creating memory maps directly are also provided.
32///
33/// Note that in builds without the `"ipc"` crate feature only heap backend is available, in that case all data lengths are stored in the heap.
34///
35/// # Serialization
36///
37/// When serialized inside [`with_ipc_serialization`] the memory map bytes are not copied, only the system handle and metadata is serialized.
38/// When serialized in other contexts all bytes are copied.
39///
40/// When deserializing memory map handles are reconnected and if deserializing bytes selects the best storage based on data length.
41///
42/// [`IpcSender`]: super::IpcSender
43#[derive(Clone)]
44#[repr(transparent)]
45pub struct IpcBytes(Arc<IpcBytesData>);
46enum IpcBytesData {
47    Heap(Vec<u8>),
48    #[cfg(ipc)]
49    AnonMemMap(IpcSharedMemory),
50    #[cfg(ipc)]
51    MemMap(IpcMemMap),
52}
53#[cfg(ipc)]
54struct IpcMemMap {
55    name: PathBuf,
56    range: ops::Range<usize>,
57    is_custom: bool,
58    map: Option<memmap2::Mmap>,
59    read_handle: Option<fs::File>,
60}
61impl fmt::Debug for IpcBytes {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        write!(f, "IpcBytes(<{} bytes>)", self.len())
64    }
65}
66impl ops::Deref for IpcBytes {
67    type Target = [u8];
68
69    fn deref(&self) -> &Self::Target {
70        match &*self.0 {
71            IpcBytesData::Heap(i) => i,
72            #[cfg(ipc)]
73            IpcBytesData::AnonMemMap(m) => m,
74            #[cfg(ipc)]
75            IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
76        }
77    }
78}
79
80impl IpcBytes {
81    /// New empty.
82    pub fn empty() -> Self {
83        IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
84    }
85}
86/// Async constructors.
87impl IpcBytes {
88    /// Start a memory efficient async writer for creating a `IpcBytes` with unknown length.
89    pub async fn new_writer() -> IpcBytesWriter {
90        IpcBytesWriter {
91            inner: blocking::Unblock::new(Self::new_writer_blocking()),
92        }
93    }
94
95    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
96    pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
97        #[cfg(ipc)]
98        if len <= Self::INLINE_MAX {
99            Ok(IpcBytesMut {
100                inner: IpcBytesMutInner::Heap(vec![0; len]),
101            })
102        } else if len <= Self::UNNAMED_MAX {
103            Ok(IpcBytesMut {
104                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
105            })
106        } else {
107            blocking::unblock(move || Self::new_mut_blocking(len)).await
108        }
109
110        #[cfg(not(ipc))]
111        {
112            Ok(IpcBytesMut {
113                inner: IpcBytesMutInner::Heap(vec![0; len]),
114            })
115        }
116    }
117
118    /// Copy or move data from vector.
119    pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
120        blocking::unblock(move || Self::from_vec_blocking(data)).await
121    }
122
123    /// Read `data` into shared memory.
124    pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
125        #[cfg(ipc)]
126        {
127            Self::from_read_ipc(data).await
128        }
129        #[cfg(not(ipc))]
130        {
131            let mut data = data;
132            let mut buf = vec![];
133            data.read_to_end(&mut buf).await;
134            Self::from_vec(buf).await
135        }
136    }
137    #[cfg(ipc)]
138    async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
139        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
140        let mut len = 0;
141
142        // INLINE_MAX read
143        loop {
144            match data.read(&mut buf[len..]).await {
145                Ok(l) => {
146                    if l == 0 {
147                        // is <= INLINE_MAX
148                        buf.truncate(len);
149                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
150                    } else {
151                        len += l;
152                        if len == Self::INLINE_MAX + 1 {
153                            // goto UNNAMED_MAX read
154                            break;
155                        }
156                    }
157                }
158                Err(e) => match e.kind() {
159                    io::ErrorKind::WouldBlock => continue,
160                    _ => return Err(e),
161                },
162            }
163        }
164
165        // UNNAMED_MAX read
166        buf.resize(Self::UNNAMED_MAX + 1, 0);
167        loop {
168            match data.read(&mut buf[len..]).await {
169                Ok(l) => {
170                    if l == 0 {
171                        // is <= UNNAMED_MAX
172                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
173                    } else {
174                        len += l;
175                        if len == Self::UNNAMED_MAX + 1 {
176                            // goto named file loop
177                            break;
178                        }
179                    }
180                }
181                Err(e) => match e.kind() {
182                    io::ErrorKind::WouldBlock => continue,
183                    _ => return Err(e),
184                },
185            }
186        }
187
188        // named file copy
189        Self::new_memmap(async |m| {
190            use futures_lite::AsyncWriteExt as _;
191
192            m.write_all(&buf).await?;
193            crate::io::copy(data, m).await?;
194            Ok(())
195        })
196        .await
197    }
198
199    /// Read `file` into shared memory.
200    pub async fn from_file(file: PathBuf) -> io::Result<Self> {
201        #[cfg(ipc)]
202        {
203            let mut file = crate::fs::File::open(file).await?;
204            let len = file.metadata().await?.len();
205            if len <= Self::UNNAMED_MAX as u64 {
206                let mut buf = vec![0u8; len as usize];
207                file.read_exact(&mut buf).await?;
208                Self::from_vec_blocking(buf)
209            } else {
210                Self::new_memmap(async move |m| {
211                    crate::io::copy(&mut file, m).await?;
212                    Ok(())
213                })
214                .await
215            }
216        }
217        #[cfg(not(ipc))]
218        {
219            let mut file = crate::fs::File::open(file).await?;
220            let mut buf = vec![];
221            file.read_to_end(&mut buf).await?;
222            Self::from_vec_blocking(buf)
223        }
224    }
225
226    /// Create a memory mapped file.
227    ///
228    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
229    /// always selects the slowest options, a file backed memory map.
230    #[cfg(ipc)]
231    pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
232        let (name, file) = blocking::unblock(Self::create_memmap).await?;
233        let mut file = crate::fs::File::from(file);
234        write(&mut file).await?;
235
236        let mut permissions = file.metadata().await?.permissions();
237        permissions.set_readonly(true);
238        #[cfg(unix)]
239        {
240            use std::os::unix::fs::PermissionsExt;
241            permissions.set_mode(0o400);
242        }
243        file.set_permissions(permissions).await?;
244
245        blocking::unblock(move || {
246            drop(file);
247            let map = IpcMemMap::read(name, None)?;
248            Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
249        })
250        .await
251    }
252
253    /// Memory map an existing file.
254    ///
255    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
256    ///
257    /// # Safety
258    ///
259    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
260    ///
261    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
262    /// so that the file data is as read-only as the static data in the current executable file.
263    ///
264    /// [`new_memmap`]: Self::new_memmap
265    #[cfg(ipc)]
266    pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
267        blocking::unblock(move || {
268            // SAFETY: up to the caller
269            unsafe { Self::open_memmap_blocking(file, range) }
270        })
271        .await
272    }
273
274    /// Gets if both point to the same memory.
275    pub fn ptr_eq(&self, other: &Self) -> bool {
276        let a = &self[..];
277        let b = &other[..];
278        (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
279    }
280
281    #[cfg(ipc)]
282    const INLINE_MAX: usize = 64 * 1024; // 64KB
283    #[cfg(ipc)]
284    const UNNAMED_MAX: usize = 128 * 1024 * 1024; // 128MB
285}
286
287/// Blocking constructors.
288impl IpcBytes {
289    /// Start a memory efficient blocking writer for creating a `IpcBytes` with unknown length.
290    pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
291        IpcBytesWriterBlocking {
292            #[cfg(ipc)]
293            heap_buf: vec![],
294            #[cfg(ipc)]
295            memmap: None,
296
297            #[cfg(not(ipc))]
298            heap_buf: std::io::Cursor::new(vec![]),
299        }
300    }
301
302    /// Allocate zeroed mutable memory that can be written to and then converted to `IpcBytes` fast.
303    pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
304        #[cfg(ipc)]
305        if len <= Self::INLINE_MAX {
306            Ok(IpcBytesMut {
307                inner: IpcBytesMutInner::Heap(vec![0; len]),
308            })
309        } else if len <= Self::UNNAMED_MAX {
310            Ok(IpcBytesMut {
311                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
312            })
313        } else {
314            let (name, file) = Self::create_memmap()?;
315            file.lock()?;
316            #[cfg(unix)]
317            {
318                let mut permissions = file.metadata()?.permissions();
319                use std::os::unix::fs::PermissionsExt;
320                permissions.set_mode(0o600);
321                file.set_permissions(permissions)?;
322            }
323            file.set_len(len as u64)?;
324            // SAFETY: we hold write lock
325            let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
326            Ok(IpcBytesMut {
327                inner: IpcBytesMutInner::MemMap {
328                    name,
329                    map,
330                    write_handle: file,
331                },
332            })
333        }
334        #[cfg(not(ipc))]
335        {
336            Ok(IpcBytesMut {
337                inner: IpcBytesMutInner::Heap(vec![0; len]),
338            })
339        }
340    }
341
342    /// Copy data from slice.
343    pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
344        #[cfg(ipc)]
345        {
346            if data.len() <= Self::INLINE_MAX {
347                Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
348            } else if data.len() <= Self::UNNAMED_MAX {
349                Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
350            } else {
351                Self::new_memmap_blocking(|m| m.write_all(data))
352            }
353        }
354        #[cfg(not(ipc))]
355        {
356            Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
357        }
358    }
359
360    /// Copy or move data from vector.
361    pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
362        #[cfg(ipc)]
363        {
364            if data.len() <= Self::INLINE_MAX {
365                Ok(Self(Arc::new(IpcBytesData::Heap(data))))
366            } else {
367                Self::from_slice_blocking(&data)
368            }
369        }
370        #[cfg(not(ipc))]
371        {
372            Ok(Self(Arc::new(IpcBytesData::Heap(data))))
373        }
374    }
375
376    /// Read `data` into shared memory.
377    pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
378        #[cfg(ipc)]
379        {
380            Self::from_read_blocking_ipc(data)
381        }
382        #[cfg(not(ipc))]
383        {
384            let mut buf = vec![];
385            data.read_to_end(&mut buf)?;
386            Self::from_vec_blocking(buf)
387        }
388    }
389    #[cfg(ipc)]
390    fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
391        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
392        let mut len = 0;
393
394        // INLINE_MAX read
395        loop {
396            match data.read(&mut buf[len..]) {
397                Ok(l) => {
398                    if l == 0 {
399                        // is <= INLINE_MAX
400                        buf.truncate(len);
401                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
402                    } else {
403                        len += l;
404                        if len == Self::INLINE_MAX + 1 {
405                            // goto UNNAMED_MAX read
406                            break;
407                        }
408                    }
409                }
410                Err(e) => match e.kind() {
411                    io::ErrorKind::WouldBlock => continue,
412                    _ => return Err(e),
413                },
414            }
415        }
416
417        // UNNAMED_MAX read
418        buf.resize(Self::UNNAMED_MAX + 1, 0);
419        loop {
420            match data.read(&mut buf[len..]) {
421                Ok(l) => {
422                    if l == 0 {
423                        // is <= UNNAMED_MAX
424                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
425                    } else {
426                        len += l;
427                        if len == Self::UNNAMED_MAX + 1 {
428                            // goto named file loop
429                            break;
430                        }
431                    }
432                }
433                Err(e) => match e.kind() {
434                    io::ErrorKind::WouldBlock => continue,
435                    _ => return Err(e),
436                },
437            }
438        }
439
440        // named file copy
441        Self::new_memmap_blocking(|m| {
442            m.write_all(&buf)?;
443            io::copy(data, m)?;
444            Ok(())
445        })
446    }
447
448    /// Read `file` into shared memory.
449    pub fn from_file_blocking(file: &Path) -> io::Result<Self> {
450        #[cfg(ipc)]
451        {
452            let mut file = fs::File::open(file)?;
453            let len = file.metadata()?.len();
454            if len <= Self::UNNAMED_MAX as u64 {
455                let mut buf = vec![0u8; len as usize];
456                file.read_exact(&mut buf)?;
457                Self::from_vec_blocking(buf)
458            } else {
459                Self::new_memmap_blocking(|m| {
460                    io::copy(&mut file, m)?;
461                    Ok(())
462                })
463            }
464        }
465        #[cfg(not(ipc))]
466        {
467            let mut file = fs::File::open(file)?;
468            let mut buf = vec![];
469            file.read_to_end(&mut buf)?;
470            Self::from_vec_blocking(buf)
471        }
472    }
473
474    /// Create a memory mapped file.
475    ///
476    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
477    /// always selects the slowest options, a file backed memory map.
478    #[cfg(ipc)]
479    pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
480        let (name, mut file) = Self::create_memmap()?;
481        write(&mut file)?;
482        let mut permissions = file.metadata()?.permissions();
483        permissions.set_readonly(true);
484        #[cfg(unix)]
485        {
486            use std::os::unix::fs::PermissionsExt;
487            permissions.set_mode(0o400);
488        }
489        file.set_permissions(permissions)?;
490
491        drop(file);
492        let map = IpcMemMap::read(name, None)?;
493        Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
494    }
495    #[cfg(ipc)]
496    fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
497        static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
498        let mut count = MEMMAP_DIR.lock();
499
500        if *count == 0 {
501            zng_env::on_process_exit(|_| {
502                IpcBytes::cleanup_memmap_storage();
503            });
504        }
505
506        let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
507        fs::create_dir_all(&dir)?;
508        let mut name = dir.join(count.to_string());
509        if *count < usize::MAX {
510            *count += 1;
511        } else {
512            // very cold path, in practice the running process will die long before this
513            for i in 0..usize::MAX {
514                name = dir.join(i.to_string());
515                if !name.exists() {
516                    break;
517                }
518            }
519            if name.exists() {
520                return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
521            }
522        };
523
524        // read because some callers create a MmapMut
525        let file = fs::OpenOptions::new()
526            .create(true)
527            .read(true)
528            .write(true)
529            .truncate(true)
530            .open(&name)?;
531        Ok((name, file))
532    }
533    #[cfg(ipc)]
534    fn cleanup_memmap_storage() {
535        if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
536            let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
537            for entry in entries {
538                if entry.is_dir() {
539                    fs::remove_dir_all(entry).ok();
540                }
541            }
542        }
543    }
544
545    /// Memory map an existing file.
546    ///
547    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`] if the file does not have enough bytes.
548    ///
549    /// # Safety
550    ///
551    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
552    ///
553    /// Note that the safe [`new_memmap`] function assures safety by retaining a read lock (Windows) and restricting access rights (Unix)
554    /// so that the file data is as read-only as the static data in the current executable file.
555    ///
556    /// [`new_memmap`]: Self::new_memmap
557    #[cfg(ipc)]
558    pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
559        let read_handle = fs::File::open(&file)?;
560        read_handle.lock_shared()?;
561        let len = read_handle.metadata()?.len();
562        if let Some(range) = &range
563            && len < range.end as u64
564        {
565            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
566        }
567        // SAFETY: up to the caller.
568        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
569
570        let range = range.unwrap_or_else(|| 0..map.len());
571
572        Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
573            name: file,
574            range,
575            read_handle: Some(read_handle),
576            is_custom: true,
577            map: Some(map),
578        }))))
579    }
580}
581
582impl AsRef<[u8]> for IpcBytes {
583    fn as_ref(&self) -> &[u8] {
584        &self[..]
585    }
586}
587impl Default for IpcBytes {
588    fn default() -> Self {
589        Self::empty()
590    }
591}
592impl PartialEq for IpcBytes {
593    fn eq(&self, other: &Self) -> bool {
594        self.ptr_eq(other) || self[..] == other[..]
595    }
596}
597#[cfg(ipc)]
598impl IpcMemMap {
599    fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
600        let read_handle = fs::File::open(&name)?;
601        read_handle.lock_shared()?;
602        // SAFETY: File is marked read-only and a read lock is held for it.
603        let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
604
605        let range = range.unwrap_or_else(|| 0..map.len());
606
607        Ok(IpcMemMap {
608            name,
609            range,
610            is_custom: false,
611            read_handle: Some(read_handle),
612            map: Some(map),
613        })
614    }
615}
616#[cfg(ipc)]
617impl Serialize for IpcMemMap {
618    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
619    where
620        S: serde::Serializer,
621    {
622        (&self.name, self.range.clone()).serialize(serializer)
623    }
624}
625#[cfg(ipc)]
626impl<'de> Deserialize<'de> for IpcMemMap {
627    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
628    where
629        D: serde::Deserializer<'de>,
630    {
631        let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
632        IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
633    }
634}
635#[cfg(ipc)]
636impl Drop for IpcMemMap {
637    fn drop(&mut self) {
638        self.map.take();
639        self.read_handle.take();
640        if !self.is_custom {
641            std::fs::remove_file(&self.name).ok();
642        }
643    }
644}
645
646impl Serialize for IpcBytes {
647    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
648    where
649        S: serde::Serializer,
650    {
651        #[cfg(ipc)]
652        {
653            if is_ipc_serialization() {
654                match &*self.0 {
655                    IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
656                    IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
657                    IpcBytesData::MemMap(b) => {
658                        // need to keep alive until other process is also holding it, so we send
659                        // a sender for the other process to signal received.
660                        let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
661                            .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
662
663                        let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
664                        let hold = self.clone();
665                        crate::spawn_wait(move || {
666                            if let Err(e) = recv.recv_blocking() {
667                                tracing::error!("IpcBytes memmap completion signal not received, {e}")
668                            }
669                            drop(hold);
670                        });
671                        Ok(r)
672                    }
673                }
674            } else {
675                serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
676            }
677        }
678        #[cfg(not(ipc))]
679        {
680            serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
681        }
682    }
683}
684impl<'de> Deserialize<'de> for IpcBytes {
685    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
686    where
687        D: serde::Deserializer<'de>,
688    {
689        #[derive(Deserialize)]
690        enum VariantId {
691            Heap,
692            #[cfg(ipc)]
693            AnonMemMap,
694            #[cfg(ipc)]
695            MemMap,
696        }
697
698        struct EnumVisitor;
699        impl<'de> serde::de::Visitor<'de> for EnumVisitor {
700            type Value = IpcBytes;
701
702            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
703                write!(f, "IpcBytes variant")
704            }
705
706            fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
707            where
708                A: serde::de::EnumAccess<'de>,
709            {
710                let (variant, access) = data.variant::<VariantId>()?;
711                match variant {
712                    VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
713                    #[cfg(ipc)]
714                    VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
715                    #[cfg(ipc)]
716                    VariantId::MemMap => {
717                        let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
718                        completion_sender.send_blocking(()).map_err(|e| {
719                            serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
720                        })?;
721                        Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
722                    }
723                }
724            }
725        }
726        struct ByteSliceVisitor;
727        impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
728            type Value = IpcBytes;
729
730            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
731                write!(f, "byte buffer")
732            }
733
734            fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
735            where
736                E: serde::de::Error,
737            {
738                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
739            }
740
741            fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
742            where
743                E: serde::de::Error,
744            {
745                IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
746            }
747
748            fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
749            where
750                E: serde::de::Error,
751            {
752                IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
753            }
754        }
755        impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
756            type Value = IpcBytes;
757
758            fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
759            where
760                D: serde::Deserializer<'de>,
761            {
762                deserializer.deserialize_bytes(ByteSliceVisitor)
763            }
764        }
765
766        #[cfg(ipc)]
767        {
768            deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
769        }
770        #[cfg(not(ipc))]
771        {
772            deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
773        }
774    }
775}
776
777/// Enables special serialization of memory mapped files for the `serialize` call.
778///
779/// IPC channels like [`IpcSender`] serialize messages inside this context to support [`IpcBytes`] fast memory map sharing across processes.
780///
781/// You can use the [`is_ipc_serialization`] to check if inside context.
782///
783/// [`IpcSender`]: super::IpcSender
784#[cfg(ipc)]
785pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
786    let parent = IPC_SERIALIZATION.replace(true);
787    let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
788    serialize()
789}
790
791/// Checks if is inside [`with_ipc_serialization`].
792#[cfg(ipc)]
793pub fn is_ipc_serialization() -> bool {
794    IPC_SERIALIZATION.get()
795}
796
797#[cfg(ipc)]
798thread_local! {
799    static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
800}
801
802impl IpcBytes {
803    /// Create a weak in process reference.
804    ///
805    /// Note that the weak reference cannot upgrade if only another process holds a strong reference.
806    pub fn downgrade(&self) -> WeakIpcBytes {
807        WeakIpcBytes(Arc::downgrade(&self.0))
808    }
809}
810
811/// Weak reference to an in process [`IpcBytes`].
812pub struct WeakIpcBytes(Weak<IpcBytesData>);
813impl WeakIpcBytes {
814    /// Get strong reference if any exists in the process.
815    pub fn upgrade(&self) -> Option<IpcBytes> {
816        self.0.upgrade().map(IpcBytes)
817    }
818
819    /// Count of strong references in the process.
820    pub fn strong_count(&self) -> usize {
821        self.0.strong_count()
822    }
823}
824
825/// Represents an async [`IpcBytes`] writer.
826///
827/// Use [`IpcBytes::new_writer`] to start writing.
828pub struct IpcBytesWriter {
829    inner: blocking::Unblock<IpcBytesWriterBlocking>,
830}
831impl IpcBytesWriter {
832    /// Finish writing and move data to a shareable [`IpcBytes`].
833    pub async fn finish(self) -> std::io::Result<IpcBytes> {
834        let inner = self.inner.into_inner().await;
835        blocking::unblock(move || inner.finish()).await
836    }
837
838    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
839    pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
840        let inner = self.inner.into_inner().await;
841        blocking::unblock(move || inner.finish_mut()).await
842    }
843}
844impl crate::io::AsyncWrite for IpcBytesWriter {
845    fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
846        crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
847    }
848
849    fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
850        crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
851    }
852
853    fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
854        crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
855    }
856}
857impl crate::io::AsyncSeek for IpcBytesWriter {
858    fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
859        crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
860    }
861}
862
863/// Represents a blocking [`IpcBytes`] writer.
864///
865/// Use [`IpcBytes::new_writer_blocking`] to start writing.
866pub struct IpcBytesWriterBlocking {
867    #[cfg(ipc)]
868    heap_buf: Vec<u8>,
869    #[cfg(ipc)]
870    memmap: Option<(PathBuf, std::fs::File)>,
871
872    #[cfg(not(ipc))]
873    heap_buf: std::io::Cursor<Vec<u8>>,
874}
875impl IpcBytesWriterBlocking {
876    /// Finish writing and move data to a shareable [`IpcBytes`].
877    pub fn finish(self) -> std::io::Result<IpcBytes> {
878        let m = self.finish_mut()?;
879        m.finish_blocking()
880    }
881
882    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
883    pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
884        self.flush()?;
885        #[cfg(ipc)]
886        {
887            let inner = match self.memmap {
888                Some((name, write_handle)) => {
889                    // SAFETY: we hold write lock
890                    let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
891                    IpcBytesMutInner::MemMap { name, map, write_handle }
892                }
893                None => {
894                    if self.heap_buf.len() > IpcBytes::INLINE_MAX {
895                        IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
896                    } else {
897                        IpcBytesMutInner::Heap(self.heap_buf)
898                    }
899                }
900            };
901            Ok(IpcBytesMut { inner })
902        }
903        #[cfg(not(ipc))]
904        {
905            let inner = IpcBytesMutInner::Heap(self.heap_buf.into_inner());
906            Ok(IpcBytesMut { inner })
907        }
908    }
909
910    #[cfg(ipc)]
911    fn alloc_memmap_file(&mut self) -> io::Result<()> {
912        if self.memmap.is_none() {
913            let (name, file) = IpcBytes::create_memmap()?;
914            file.lock()?;
915            #[cfg(unix)]
916            {
917                let mut permissions = file.metadata()?.permissions();
918                use std::os::unix::fs::PermissionsExt;
919                permissions.set_mode(0o600);
920                file.set_permissions(permissions)?;
921            }
922            self.memmap = Some((name, file));
923        }
924        let file = &mut self.memmap.as_mut().unwrap().1;
925
926        file.write_all(&self.heap_buf)?;
927        // already allocated UNNAMED_MAX, continue using it as a large buffer
928        self.heap_buf.clear();
929        Ok(())
930    }
931}
932impl std::io::Write for IpcBytesWriterBlocking {
933    fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
934        #[cfg(ipc)]
935        {
936            if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
937                // write exceed heap buffer, convert to memmap or flush to existing memmap
938                self.alloc_memmap_file()?;
939
940                if write_buf.len() > IpcBytes::UNNAMED_MAX {
941                    // writing massive payload, skip buffer
942                    self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
943                } else {
944                    self.heap_buf.extend_from_slice(write_buf);
945                }
946            } else {
947                if self.memmap.is_none() {
948                    // heap buffer not fully allocated yet, ensure we only allocate up to UNNAMED_MAX
949                    self.heap_buf
950                        .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
951                }
952                self.heap_buf.extend_from_slice(write_buf);
953            }
954
955            Ok(write_buf.len())
956        }
957
958        #[cfg(not(ipc))]
959        {
960            std::io::Write::write(&mut self.heap_buf, write_buf)
961        }
962    }
963
964    fn flush(&mut self) -> io::Result<()> {
965        #[cfg(ipc)]
966        if let Some((_, file)) = &mut self.memmap {
967            if !self.heap_buf.is_empty() {
968                file.write_all(&self.heap_buf)?;
969                self.heap_buf.clear();
970            }
971            file.flush()?;
972        }
973        Ok(())
974    }
975}
976impl std::io::Seek for IpcBytesWriterBlocking {
977    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
978        #[cfg(ipc)]
979        {
980            self.alloc_memmap_file()?;
981            let (_, file) = self.memmap.as_mut().unwrap();
982            if !self.heap_buf.is_empty() {
983                file.write_all(&self.heap_buf)?;
984                self.heap_buf.clear();
985            }
986            file.seek(pos)
987        }
988        #[cfg(not(ipc))]
989        {
990            std::io::Seek::seek(&mut self.heap_buf, pos)
991        }
992    }
993}
994
995enum IpcBytesMutInner {
996    Heap(Vec<u8>),
997    #[cfg(ipc)]
998    AnonMemMap(IpcSharedMemory),
999    #[cfg(ipc)]
1000    MemMap {
1001        name: PathBuf,
1002        map: memmap2::MmapMut,
1003        write_handle: std::fs::File,
1004    },
1005}
1006
1007/// Represents preallocated exclusive mutable memory for a new [`IpcBytes`].
1008///
1009/// Use [`IpcBytes::new_mut`] or [`IpcBytes::new_mut_blocking`] to allocate.
1010pub struct IpcBytesMut {
1011    inner: IpcBytesMutInner,
1012}
1013impl ops::Deref for IpcBytesMut {
1014    type Target = [u8];
1015
1016    fn deref(&self) -> &Self::Target {
1017        match &self.inner {
1018            IpcBytesMutInner::Heap(v) => &v[..],
1019            #[cfg(ipc)]
1020            IpcBytesMutInner::AnonMemMap(m) => &m[..],
1021            #[cfg(ipc)]
1022            IpcBytesMutInner::MemMap { map, .. } => &map[..],
1023        }
1024    }
1025}
1026impl ops::DerefMut for IpcBytesMut {
1027    fn deref_mut(&mut self) -> &mut Self::Target {
1028        match &mut self.inner {
1029            IpcBytesMutInner::Heap(v) => &mut v[..],
1030            #[cfg(ipc)]
1031            IpcBytesMutInner::AnonMemMap(m) => {
1032                // SAFETY: we are the only reference to the map
1033                unsafe { m.deref_mut() }
1034            }
1035            #[cfg(ipc)]
1036            IpcBytesMutInner::MemMap { map, .. } => &mut map[..],
1037        }
1038    }
1039}
1040impl fmt::Debug for IpcBytesMut {
1041    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1042        write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1043    }
1044}
1045impl IpcBytesMut {
1046    /// Uses `buf` or copies it to exclusive mutable memory.
1047    pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1048        #[cfg(ipc)]
1049        if buf.len() <= IpcBytes::INLINE_MAX {
1050            Ok(Self {
1051                inner: IpcBytesMutInner::Heap(buf),
1052            })
1053        } else {
1054            blocking::unblock(move || {
1055                let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1056                b[..].copy_from_slice(&buf);
1057                Ok(b)
1058            })
1059            .await
1060        }
1061        #[cfg(not(ipc))]
1062        {
1063            Ok(Self {
1064                inner: IpcBytesMutInner::Heap(buf),
1065            })
1066        }
1067    }
1068
1069    /// Convert to immutable shareable [`IpcBytes`].
1070    pub async fn finish(mut self) -> io::Result<IpcBytes> {
1071        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1072            IpcBytesMutInner::Heap(v) => IpcBytesData::Heap(v),
1073            #[cfg(ipc)]
1074            IpcBytesMutInner::AnonMemMap(m) => IpcBytesData::AnonMemMap(m),
1075            #[cfg(ipc)]
1076            IpcBytesMutInner::MemMap { name, map, write_handle } => {
1077                blocking::unblock(move || Self::finish_memmap(name, map, write_handle)).await?
1078            }
1079        };
1080        Ok(IpcBytes(Arc::new(data)))
1081    }
1082
1083    #[cfg(ipc)]
1084    fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File) -> Result<IpcBytesData, io::Error> {
1085        let len = map.len();
1086        write_handle.unlock()?;
1087        let map = map.make_read_only()?;
1088        let mut permissions = write_handle.metadata()?.permissions();
1089        permissions.set_readonly(true);
1090        #[cfg(unix)]
1091        {
1092            use std::os::unix::fs::PermissionsExt;
1093            permissions.set_mode(0o400);
1094        }
1095        write_handle.set_permissions(permissions)?;
1096        drop(write_handle);
1097        let read_handle = std::fs::File::open(&name)?;
1098        read_handle.lock_shared()?;
1099        Ok(IpcBytesData::MemMap(IpcMemMap {
1100            name,
1101            range: 0..len,
1102            is_custom: false,
1103            map: Some(map),
1104            read_handle: Some(read_handle),
1105        }))
1106    }
1107}
1108impl IpcBytesMut {
1109    /// Uses `buf` or copies it to exclusive mutable memory.
1110    pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1111        #[cfg(ipc)]
1112        if buf.len() <= IpcBytes::INLINE_MAX {
1113            Ok(Self {
1114                inner: IpcBytesMutInner::Heap(buf),
1115            })
1116        } else {
1117            let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1118            b[..].copy_from_slice(&buf);
1119            Ok(b)
1120        }
1121        #[cfg(not(ipc))]
1122        {
1123            Ok(Self {
1124                inner: IpcBytesMutInner::Heap(buf),
1125            })
1126        }
1127    }
1128
1129    /// Convert to immutable shareable [`IpcBytes`].
1130    pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1131        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1132            IpcBytesMutInner::Heap(v) => IpcBytesData::Heap(v),
1133            #[cfg(ipc)]
1134            IpcBytesMutInner::AnonMemMap(m) => IpcBytesData::AnonMemMap(m),
1135            #[cfg(ipc)]
1136            IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle)?,
1137        };
1138        Ok(IpcBytes(Arc::new(data)))
1139    }
1140}
1141#[cfg(ipc)]
1142impl Drop for IpcBytesMut {
1143    fn drop(&mut self) {
1144        if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1145            drop(map);
1146            drop(write_handle);
1147            std::fs::remove_file(name).ok();
1148        }
1149    }
1150}