1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 cell::Cell,
5 fmt, fs,
6 io::{self, Read, Write},
7 iter::FusedIterator,
8 marker::PhantomData,
9 mem::MaybeUninit,
10 ops,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, Weak},
14};
15
16use futures_lite::{AsyncReadExt, AsyncWriteExt as _};
17#[cfg(ipc)]
18use ipc_channel::ipc::IpcSharedMemory;
19use parking_lot::Mutex;
20use serde::{Deserialize, Serialize, de::VariantAccess};
21use zng_app_context::RunOnDrop;
22
23#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(Arc<IpcBytesData>);
49enum IpcBytesData {
50 Heap(Vec<u8>),
51 #[cfg(ipc)]
52 AnonMemMap(IpcSharedMemory),
53 #[cfg(ipc)]
54 MemMap(IpcMemMap),
55 #[cfg(ipc)]
56 MemMapAsyncDeserialize {
57 map: std::sync::OnceLock<IpcMemMap>,
58 len: usize,
60 },
61}
62impl fmt::Debug for IpcBytes {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 write!(f, "IpcBytes(<{} bytes>)", self.len())
65 }
66}
67impl ops::Deref for IpcBytes {
68 type Target = [u8];
69
70 fn deref(&self) -> &Self::Target {
71 match &*self.0 {
72 IpcBytesData::Heap(i) => i,
73 #[cfg(ipc)]
74 IpcBytesData::AnonMemMap(m) => m,
75 #[cfg(ipc)]
76 IpcBytesData::MemMap(f) => f,
77 #[cfg(ipc)]
78 IpcBytesData::MemMapAsyncDeserialize { map, .. } => map.wait(),
79 }
80 }
81}
82
83impl IpcBytes {
84 pub fn empty() -> Self {
86 IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
87 }
88}
89impl IpcBytes {
91 pub async fn new_writer() -> IpcBytesWriter {
93 IpcBytesWriter {
94 inner: blocking::Unblock::new(Self::new_writer_blocking()),
95 }
96 }
97
98 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
100 IpcBytesMut::new(len).await
101 }
102
103 pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
105 blocking::unblock(move || Self::from_vec_blocking(data)).await
106 }
107
108 pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
118 #[cfg(ipc)]
119 {
120 let (min, max) = iter.size_hint();
121 if let Some(max) = max {
122 if max <= Self::INLINE_MAX {
123 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
124 } else if max == min {
125 let mut r = IpcBytes::new_mut(max).await?;
126 let mut actual_len = 0;
127 for (i, b) in r.iter_mut().zip(iter) {
128 *i = b;
129 actual_len += 1;
130 }
131 r.truncate(actual_len);
132 return r.finish().await;
133 }
134 }
135
136 let mut writer = Self::new_writer().await;
137 for b in iter {
138 writer.write_all(&[b]).await?;
139 }
140 writer.finish().await
141 }
142
143 #[cfg(not(ipc))]
144 {
145 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
146 }
147 }
148
149 pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
151 #[cfg(ipc)]
152 {
153 Self::from_read_ipc(data).await
154 }
155 #[cfg(not(ipc))]
156 {
157 let mut data = data;
158 let mut buf = vec![];
159 data.read_to_end(&mut buf).await;
160 Self::from_vec(buf).await
161 }
162 }
163 #[cfg(ipc)]
164 async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
165 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
166 let mut len = 0;
167
168 loop {
170 match data.read(&mut buf[len..]).await {
171 Ok(l) => {
172 if l == 0 {
173 buf.truncate(len);
175 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
176 } else {
177 len += l;
178 if len == Self::INLINE_MAX + 1 {
179 break;
181 }
182 }
183 }
184 Err(e) => match e.kind() {
185 io::ErrorKind::WouldBlock => continue,
186 _ => return Err(e),
187 },
188 }
189 }
190
191 buf.resize(Self::UNNAMED_MAX + 1, 0);
193 loop {
194 match data.read(&mut buf[len..]).await {
195 Ok(l) => {
196 if l == 0 {
197 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
199 } else {
200 len += l;
201 if len == Self::UNNAMED_MAX + 1 {
202 break;
204 }
205 }
206 }
207 Err(e) => match e.kind() {
208 io::ErrorKind::WouldBlock => continue,
209 _ => return Err(e),
210 },
211 }
212 }
213
214 Self::new_memmap(async |m| {
216 use futures_lite::AsyncWriteExt as _;
217
218 m.write_all(&buf).await?;
219 crate::io::copy(data, m).await?;
220 Ok(())
221 })
222 .await
223 }
224
225 pub async fn from_path(path: PathBuf) -> io::Result<Self> {
227 let file = crate::fs::File::open(path).await?;
228 Self::from_file(file).await
229 }
230 pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
232 #[cfg(ipc)]
233 {
234 let len = file.metadata().await?.len();
235 if len <= Self::UNNAMED_MAX as u64 {
236 let mut buf = vec![0u8; len as usize];
237 file.read_exact(&mut buf).await?;
238 Self::from_vec_blocking(buf)
239 } else {
240 Self::new_memmap(async move |m| {
241 crate::io::copy(&mut file, m).await?;
242 Ok(())
243 })
244 .await
245 }
246 }
247 #[cfg(not(ipc))]
248 {
249 let mut buf = vec![];
250 file.read_to_end(&mut buf).await?;
251 Self::from_vec_blocking(buf)
252 }
253 }
254
255 #[cfg(ipc)]
260 pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
261 let (name, file) = blocking::unblock(Self::create_memmap).await?;
262 let mut file = crate::fs::File::from(file);
263 write(&mut file).await?;
264
265 let mut permissions = file.metadata().await?.permissions();
266 permissions.set_readonly(true);
267 #[cfg(unix)]
268 {
269 use std::os::unix::fs::PermissionsExt;
270 permissions.set_mode(0o400);
271 }
272 file.set_permissions(permissions).await?;
273
274 blocking::unblock(move || {
275 drop(file);
276 let map = IpcMemMap::read(name, None)?;
277 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
278 })
279 .await
280 }
281
282 #[cfg(ipc)]
295 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
296 blocking::unblock(move || {
297 unsafe { Self::open_memmap_blocking(file, range) }
299 })
300 .await
301 }
302
303 pub fn ptr_eq(&self, other: &Self) -> bool {
305 let a = &self[..];
306 let b = &other[..];
307 (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
308 }
309
310 #[cfg(ipc)]
311 const INLINE_MAX: usize = 64 * 1024; #[cfg(ipc)]
313 const UNNAMED_MAX: usize = 128 * 1024 * 1024; }
315
316impl IpcBytes {
318 pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
320 IpcBytesWriterBlocking {
321 #[cfg(ipc)]
322 heap_buf: vec![],
323 #[cfg(ipc)]
324 memmap: None,
325
326 #[cfg(not(ipc))]
327 heap_buf: std::io::Cursor::new(vec![]),
328 }
329 }
330
331 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
333 IpcBytesMut::new_blocking(len)
334 }
335
336 pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
338 #[cfg(ipc)]
339 {
340 if data.len() <= Self::INLINE_MAX {
341 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
342 } else if data.len() <= Self::UNNAMED_MAX {
343 Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
344 } else {
345 Self::new_memmap_blocking(|m| m.write_all(data))
346 }
347 }
348 #[cfg(not(ipc))]
349 {
350 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
351 }
352 }
353
354 pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
356 #[cfg(ipc)]
357 {
358 if data.len() <= Self::INLINE_MAX {
359 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
360 } else {
361 Self::from_slice_blocking(&data)
362 }
363 }
364 #[cfg(not(ipc))]
365 {
366 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
367 }
368 }
369
370 pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
380 #[cfg(ipc)]
381 {
382 let (min, max) = iter.size_hint();
383 if let Some(max) = max {
384 if max <= Self::INLINE_MAX {
385 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
386 } else if max == min {
387 let mut r = IpcBytes::new_mut_blocking(max)?;
388 let mut actual_len = 0;
389 for (i, b) in r.iter_mut().zip(iter) {
390 *i = b;
391 actual_len += 1;
392 }
393 r.truncate(actual_len);
394 return r.finish_blocking();
395 }
396 }
397
398 let mut writer = Self::new_writer_blocking();
399 for b in iter {
400 writer.write_all(&[b])?;
401 }
402 writer.finish()
403 }
404 #[cfg(not(ipc))]
405 {
406 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
407 }
408 }
409
410 pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
412 #[cfg(ipc)]
413 {
414 Self::from_read_blocking_ipc(data)
415 }
416 #[cfg(not(ipc))]
417 {
418 let mut buf = vec![];
419 data.read_to_end(&mut buf)?;
420 Self::from_vec_blocking(buf)
421 }
422 }
423 #[cfg(ipc)]
424 fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
425 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
426 let mut len = 0;
427
428 loop {
430 match data.read(&mut buf[len..]) {
431 Ok(l) => {
432 if l == 0 {
433 buf.truncate(len);
435 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
436 } else {
437 len += l;
438 if len == Self::INLINE_MAX + 1 {
439 break;
441 }
442 }
443 }
444 Err(e) => match e.kind() {
445 io::ErrorKind::WouldBlock => continue,
446 _ => return Err(e),
447 },
448 }
449 }
450
451 buf.resize(Self::UNNAMED_MAX + 1, 0);
453 loop {
454 match data.read(&mut buf[len..]) {
455 Ok(l) => {
456 if l == 0 {
457 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
459 } else {
460 len += l;
461 if len == Self::UNNAMED_MAX + 1 {
462 break;
464 }
465 }
466 }
467 Err(e) => match e.kind() {
468 io::ErrorKind::WouldBlock => continue,
469 _ => return Err(e),
470 },
471 }
472 }
473
474 Self::new_memmap_blocking(|m| {
476 m.write_all(&buf)?;
477 io::copy(data, m)?;
478 Ok(())
479 })
480 }
481
482 pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
484 let file = fs::File::open(path)?;
485 Self::from_file_blocking(file)
486 }
487 pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
489 #[cfg(ipc)]
490 {
491 let len = file.metadata()?.len();
492 if len <= Self::UNNAMED_MAX as u64 {
493 let mut buf = vec![0u8; len as usize];
494 file.read_exact(&mut buf)?;
495 Self::from_vec_blocking(buf)
496 } else {
497 Self::new_memmap_blocking(|m| {
498 io::copy(&mut file, m)?;
499 Ok(())
500 })
501 }
502 }
503 #[cfg(not(ipc))]
504 {
505 let mut buf = vec![];
506 file.read_to_end(&mut buf)?;
507 Self::from_vec_blocking(buf)
508 }
509 }
510
511 #[cfg(ipc)]
516 pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
517 let (name, mut file) = Self::create_memmap()?;
518 write(&mut file)?;
519 let mut permissions = file.metadata()?.permissions();
520 permissions.set_readonly(true);
521 #[cfg(unix)]
522 {
523 use std::os::unix::fs::PermissionsExt;
524 permissions.set_mode(0o400);
525 }
526 file.set_permissions(permissions)?;
527
528 drop(file);
529 let map = IpcMemMap::read(name, None)?;
530 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
531 }
532 #[cfg(ipc)]
533 fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
534 static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
535 let mut count = MEMMAP_DIR.lock();
536
537 if *count == 0 {
538 zng_env::on_process_exit(|_| {
540 cleanup_memmap_storage();
542 });
543 }
544
545 let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
546 fs::create_dir_all(&dir)?;
547 let mut name = dir.join(count.to_string());
548 if *count < usize::MAX {
549 *count += 1;
550 } else {
551 for i in 0..usize::MAX {
553 name = dir.join(i.to_string());
554 if !name.exists() {
555 break;
556 }
557 }
558 if name.exists() {
559 return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
560 }
561 };
562
563 let mut opt = fs::OpenOptions::new();
564 opt.create(true).read(true).write(true).truncate(true);
565 #[cfg(windows)]
566 {
567 use std::os::windows::fs::OpenOptionsExt as _;
568 const FILE_ATTRIBUTE_TEMPORARY: u32 = 0x00000100;
569 opt.attributes(FILE_ATTRIBUTE_TEMPORARY);
570 }
571 let file = opt.open(&name)?;
572 Ok((name, file))
573 }
574
575 #[cfg(ipc)]
588 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
589 let read_handle = fs::File::open(&file)?;
590 read_handle.lock_shared()?;
591 let len = read_handle.metadata()?.len();
592 if let Some(range) = &range
593 && len < range.end as u64
594 {
595 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
596 }
597 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
599
600 let range = range.unwrap_or_else(|| 0..map.len());
601
602 Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
603 name: file,
604 range,
605 map: IpcMemMapData::Connected(map, read_handle),
606 is_custom: true,
607 }))))
608 }
609}
610
611impl IpcBytes {
612 pub fn len(&self) -> usize {
614 match &*self.0 {
615 IpcBytesData::Heap(b) => b.len(),
616 #[cfg(ipc)]
617 IpcBytesData::AnonMemMap(b) => b.len(),
618 #[cfg(ipc)]
619 IpcBytesData::MemMap(b) => b.len(),
620 #[cfg(ipc)]
621 IpcBytesData::MemMapAsyncDeserialize { len, .. } => *len,
622 }
623 }
624
625 pub fn is_empty(&self) -> bool {
627 match &*self.0 {
628 IpcBytesData::Heap(b) => b.is_empty(),
629 #[cfg(ipc)]
630 IpcBytesData::AnonMemMap(b) => b.is_empty(),
631 #[cfg(ipc)]
632 IpcBytesData::MemMap(b) => b.is_empty(),
633 #[cfg(ipc)]
634 IpcBytesData::MemMapAsyncDeserialize { len, .. } => *len == 0,
635 }
636 }
637}
638
639pub fn cleanup_memmap_storage() {
644 #[cfg(ipc)]
645 if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
646 let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
647 for entry in entries {
648 if entry.is_dir() {
649 fs::remove_dir_all(entry).ok();
650 }
651 }
652 }
653}
654
655impl AsRef<[u8]> for IpcBytes {
656 fn as_ref(&self) -> &[u8] {
657 &self[..]
658 }
659}
660impl Default for IpcBytes {
661 fn default() -> Self {
662 Self::empty()
663 }
664}
665impl PartialEq for IpcBytes {
666 fn eq(&self, other: &Self) -> bool {
667 self.ptr_eq(other) || self[..] == other[..]
668 }
669}
670impl Eq for IpcBytes {}
671#[cfg(ipc)]
672struct IpcMemMap {
673 name: PathBuf,
674 range: ops::Range<usize>,
675 is_custom: bool,
676 map: IpcMemMapData,
677}
678#[cfg(ipc)]
679enum IpcMemMapData {
680 #[allow(unused)] Connected(memmap2::Mmap, fs::File),
682 AsyncDeserializing,
683 AsyncDeserializeError(io::Error),
684 Dropped,
685}
686#[cfg(ipc)]
687impl ops::Deref for IpcMemMap {
688 type Target = [u8];
689
690 fn deref(&self) -> &[u8] {
691 match &self.map {
692 IpcMemMapData::Connected(mmap, _) => &mmap[self.range.clone()],
693 IpcMemMapData::AsyncDeserializeError(e) => panic!("IpcBytes failed to reconnect with deserialized memmap file, {e}"),
694 IpcMemMapData::AsyncDeserializing => unreachable!(), IpcMemMapData::Dropped => unreachable!(),
696 }
697 }
698}
699#[cfg(ipc)]
700impl IpcMemMap {
701 fn read_map(name: &Path) -> io::Result<(memmap2::Mmap, fs::File)> {
702 let read_handle = fs::File::open(name)?;
703 read_handle.lock_shared()?;
704 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
706
707 Ok((map, read_handle))
708 }
709
710 fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
711 let r = Self::read_map(&name)?;
712 let range = range.unwrap_or_else(|| 0..r.0.len());
713 Ok(IpcMemMap {
714 name,
715 range,
716 is_custom: false,
717 map: IpcMemMapData::Connected(r.0, r.1),
718 })
719 }
720
721 fn finish_deserialize(&mut self) {
722 self.map = match Self::read_map(&self.name) {
723 Ok(r) => IpcMemMapData::Connected(r.0, r.1),
724 Err(e) => IpcMemMapData::AsyncDeserializeError(e),
725 };
726 }
727}
728#[cfg(ipc)]
729impl Serialize for IpcMemMap {
730 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
731 where
732 S: serde::Serializer,
733 {
734 (&self.name, self.range.clone()).serialize(serializer)
735 }
736}
737#[cfg(ipc)]
738impl<'de> Deserialize<'de> for IpcMemMap {
739 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
740 where
741 D: serde::Deserializer<'de>,
742 {
743 let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
744 Ok(IpcMemMap {
745 name,
746 range,
747 is_custom: false,
748 map: IpcMemMapData::AsyncDeserializing,
749 })
750 }
751}
752#[cfg(ipc)]
753impl Drop for IpcMemMap {
754 fn drop(&mut self) {
755 self.map = IpcMemMapData::Dropped;
756 if !self.is_custom {
757 std::fs::remove_file(&self.name).ok();
758 }
759 }
760}
761
762impl Serialize for IpcBytes {
763 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
764 where
765 S: serde::Serializer,
766 {
767 #[cfg(ipc)]
768 {
769 if is_ipc_serialization() {
770 match &*self.0 {
771 IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
772 IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
773 IpcBytesData::MemMap(_) | IpcBytesData::MemMapAsyncDeserialize { .. } => {
774 let b = match &*self.0 {
775 IpcBytesData::MemMap(b) => b,
776 IpcBytesData::MemMapAsyncDeserialize { map, .. } => map.wait(),
777 _ => unreachable!(),
778 };
779 let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
782 .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
783
784 let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
785 let hold = self.clone();
786 blocking::unblock(move || {
787 if let Err(e) = recv.recv_blocking() {
788 tracing::error!("IpcBytes memmap completion signal not received, {e}")
789 }
790 drop(hold);
791 })
792 .detach();
793 Ok(r)
794 }
795 }
796 } else {
797 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
798 }
799 }
800 #[cfg(not(ipc))]
801 {
802 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
803 }
804 }
805}
806impl<'de> Deserialize<'de> for IpcBytes {
807 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
808 where
809 D: serde::Deserializer<'de>,
810 {
811 #[derive(Deserialize)]
812 enum VariantId {
813 Heap,
814 #[cfg(ipc)]
815 AnonMemMap,
816 #[cfg(ipc)]
817 MemMap,
818 }
819
820 struct EnumVisitor;
821 impl<'de> serde::de::Visitor<'de> for EnumVisitor {
822 type Value = IpcBytes;
823
824 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
825 write!(f, "IpcBytes variant")
826 }
827
828 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
829 where
830 A: serde::de::EnumAccess<'de>,
831 {
832 let (variant, access) = data.variant::<VariantId>()?;
833 match variant {
834 VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
835 #[cfg(ipc)]
836 VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
837 #[cfg(ipc)]
838 VariantId::MemMap => {
839 let (mut memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
840
841 let ipc_bytes = IpcBytes(Arc::new(IpcBytesData::MemMapAsyncDeserialize {
842 map: std::sync::OnceLock::new(),
843 len: memmap.range.len(),
844 }));
845 let ipc_bytes_sender = ipc_bytes.0.clone();
846 blocking::unblock(move || {
847 memmap.finish_deserialize();
848 if let IpcMemMapData::AsyncDeserializeError(e) = &memmap.map {
849 tracing::error!("failed to reconnect with deserialized memmap file, will panic on first read, {e}");
850 }
851 if let Err(e) = completion_sender.send_blocking(()) {
852 tracing::error!("failed to send memmap deserialize completion signal, {e}");
853 }
854 match &*ipc_bytes_sender {
855 IpcBytesData::MemMapAsyncDeserialize { map, .. } => {
856 map.get_or_init(|| memmap);
857 }
858 _ => unreachable!(),
859 }
860 })
861 .detach();
862
863 Ok(ipc_bytes)
864 }
865 }
866 }
867 }
868 struct ByteSliceVisitor;
869 impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
870 type Value = IpcBytes;
871
872 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
873 write!(f, "byte buffer")
874 }
875
876 fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
877 where
878 E: serde::de::Error,
879 {
880 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
881 }
882
883 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
884 where
885 E: serde::de::Error,
886 {
887 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
888 }
889
890 fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
891 where
892 E: serde::de::Error,
893 {
894 IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
895 }
896 }
897 impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
898 type Value = IpcBytes;
899
900 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
901 where
902 D: serde::Deserializer<'de>,
903 {
904 deserializer.deserialize_bytes(ByteSliceVisitor)
905 }
906 }
907
908 #[cfg(ipc)]
909 {
910 deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
911 }
912 #[cfg(not(ipc))]
913 {
914 deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
915 }
916 }
917}
918
919#[cfg(ipc)]
927pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
928 let parent = IPC_SERIALIZATION.replace(true);
929 let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
930 serialize()
931}
932
933#[cfg(ipc)]
935pub fn is_ipc_serialization() -> bool {
936 IPC_SERIALIZATION.get()
937}
938
939#[cfg(ipc)]
940thread_local! {
941 static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
942}
943
944impl IpcBytes {
945 pub fn downgrade(&self) -> WeakIpcBytes {
949 WeakIpcBytes(Arc::downgrade(&self.0))
950 }
951}
952
953pub struct WeakIpcBytes(Weak<IpcBytesData>);
955impl WeakIpcBytes {
956 pub fn upgrade(&self) -> Option<IpcBytes> {
958 self.0.upgrade().map(IpcBytes)
959 }
960
961 pub fn strong_count(&self) -> usize {
963 self.0.strong_count()
964 }
965}
966
967pub struct IpcBytesWriter {
971 inner: blocking::Unblock<IpcBytesWriterBlocking>,
972}
973impl IpcBytesWriter {
974 pub async fn finish(self) -> std::io::Result<IpcBytes> {
976 let inner = self.inner.into_inner().await;
977 blocking::unblock(move || inner.finish()).await
978 }
979
980 pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
982 let inner = self.inner.into_inner().await;
983 blocking::unblock(move || inner.finish_mut()).await
984 }
985}
986impl crate::io::AsyncWrite for IpcBytesWriter {
987 fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
988 crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
989 }
990
991 fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
992 crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
993 }
994
995 fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
996 crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
997 }
998}
999impl crate::io::AsyncSeek for IpcBytesWriter {
1000 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
1001 crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
1002 }
1003}
1004
1005pub struct IpcBytesWriterBlocking {
1009 #[cfg(ipc)]
1010 heap_buf: Vec<u8>,
1011 #[cfg(ipc)]
1012 memmap: Option<(PathBuf, std::fs::File)>,
1013
1014 #[cfg(not(ipc))]
1015 heap_buf: std::io::Cursor<Vec<u8>>,
1016}
1017impl IpcBytesWriterBlocking {
1018 pub fn finish(self) -> std::io::Result<IpcBytes> {
1020 let m = self.finish_mut()?;
1021 m.finish_blocking()
1022 }
1023
1024 pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
1026 self.flush()?;
1027 #[cfg(ipc)]
1028 {
1029 let (len, inner) = match self.memmap {
1030 Some((name, write_handle)) => {
1031 let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
1033 let len = map.len();
1034 (len, IpcBytesMutInner::MemMap { name, map, write_handle })
1035 }
1036 None => {
1037 let len = self.heap_buf.len();
1038 let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
1039 IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
1040 } else {
1041 IpcBytesMutInner::Heap(self.heap_buf)
1042 };
1043 (len, i)
1044 }
1045 };
1046 Ok(IpcBytesMut { len, inner })
1047 }
1048 #[cfg(not(ipc))]
1049 {
1050 let heap_buf = self.heap_buf.into_inner();
1051 let len = heap_buf.len();
1052 let inner = IpcBytesMutInner::Heap(heap_buf);
1053 Ok(IpcBytesMut { len, inner })
1054 }
1055 }
1056
1057 #[cfg(ipc)]
1058 fn alloc_memmap_file(&mut self) -> io::Result<()> {
1059 if self.memmap.is_none() {
1060 let (name, file) = IpcBytes::create_memmap()?;
1061 file.lock()?;
1062 #[cfg(unix)]
1063 {
1064 let mut permissions = file.metadata()?.permissions();
1065 use std::os::unix::fs::PermissionsExt;
1066 permissions.set_mode(0o600);
1067 file.set_permissions(permissions)?;
1068 }
1069 self.memmap = Some((name, file));
1070 }
1071 let file = &mut self.memmap.as_mut().unwrap().1;
1072
1073 file.write_all(&self.heap_buf)?;
1074 self.heap_buf.clear();
1076 Ok(())
1077 }
1078}
1079impl std::io::Write for IpcBytesWriterBlocking {
1080 fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
1081 #[cfg(ipc)]
1082 {
1083 if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
1084 self.alloc_memmap_file()?;
1086
1087 if write_buf.len() > IpcBytes::UNNAMED_MAX {
1088 self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
1090 } else {
1091 self.heap_buf.extend_from_slice(write_buf);
1092 }
1093 } else {
1094 if self.memmap.is_none() {
1095 self.heap_buf
1097 .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
1098 }
1099 self.heap_buf.extend_from_slice(write_buf);
1100 }
1101
1102 Ok(write_buf.len())
1103 }
1104
1105 #[cfg(not(ipc))]
1106 {
1107 std::io::Write::write(&mut self.heap_buf, write_buf)
1108 }
1109 }
1110
1111 fn flush(&mut self) -> io::Result<()> {
1112 #[cfg(ipc)]
1113 if let Some((_, file)) = &mut self.memmap {
1114 if !self.heap_buf.is_empty() {
1115 file.write_all(&self.heap_buf)?;
1116 self.heap_buf.clear();
1117 }
1118 file.flush()?;
1119 }
1120 Ok(())
1121 }
1122}
1123impl std::io::Seek for IpcBytesWriterBlocking {
1124 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
1125 #[cfg(ipc)]
1126 {
1127 self.alloc_memmap_file()?;
1128 let (_, file) = self.memmap.as_mut().unwrap();
1129 if !self.heap_buf.is_empty() {
1130 file.write_all(&self.heap_buf)?;
1131 self.heap_buf.clear();
1132 }
1133 file.seek(pos)
1134 }
1135 #[cfg(not(ipc))]
1136 {
1137 std::io::Seek::seek(&mut self.heap_buf, pos)
1138 }
1139 }
1140}
1141
1142enum IpcBytesMutInner {
1143 Heap(Vec<u8>),
1144 #[cfg(ipc)]
1145 AnonMemMap(IpcSharedMemory),
1146 #[cfg(ipc)]
1147 MemMap {
1148 name: PathBuf,
1149 map: memmap2::MmapMut,
1150 write_handle: std::fs::File,
1151 },
1152}
1153
1154pub struct IpcBytesMut {
1158 inner: IpcBytesMutInner,
1159 len: usize,
1160}
1161impl ops::Deref for IpcBytesMut {
1162 type Target = [u8];
1163
1164 fn deref(&self) -> &Self::Target {
1165 let len = self.len;
1166 match &self.inner {
1167 IpcBytesMutInner::Heap(v) => &v[..len],
1168 #[cfg(ipc)]
1169 IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1170 #[cfg(ipc)]
1171 IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1172 }
1173 }
1174}
1175impl ops::DerefMut for IpcBytesMut {
1176 fn deref_mut(&mut self) -> &mut Self::Target {
1177 let len = self.len;
1178 match &mut self.inner {
1179 IpcBytesMutInner::Heap(v) => &mut v[..len],
1180 #[cfg(ipc)]
1181 IpcBytesMutInner::AnonMemMap(m) => {
1182 unsafe { m.deref_mut() }
1184 }
1185 #[cfg(ipc)]
1186 IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1187 }
1188 }
1189}
1190impl fmt::Debug for IpcBytesMut {
1191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1192 write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1193 }
1194}
1195impl IpcBytesMut {
1196 pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
1198 #[cfg(ipc)]
1199 if len <= IpcBytes::INLINE_MAX {
1200 Ok(IpcBytesMut {
1201 len,
1202 inner: IpcBytesMutInner::Heap(vec![0; len]),
1203 })
1204 } else if len <= IpcBytes::UNNAMED_MAX {
1205 Ok(IpcBytesMut {
1206 len,
1207 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1208 })
1209 } else {
1210 blocking::unblock(move || Self::new_blocking(len)).await
1211 }
1212
1213 #[cfg(not(ipc))]
1214 {
1215 Ok(IpcBytesMut {
1216 len,
1217 inner: IpcBytesMutInner::Heap(vec![0; len]),
1218 })
1219 }
1220 }
1221
1222 pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
1224 #[cfg(ipc)]
1225 if len <= IpcBytes::INLINE_MAX {
1226 Ok(IpcBytesMut {
1227 len,
1228 inner: IpcBytesMutInner::Heap(vec![0; len]),
1229 })
1230 } else if len <= IpcBytes::UNNAMED_MAX {
1231 Ok(IpcBytesMut {
1232 len,
1233 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1234 })
1235 } else {
1236 let (name, file) = IpcBytes::create_memmap()?;
1237 file.lock()?;
1238 #[cfg(unix)]
1239 {
1240 let mut permissions = file.metadata()?.permissions();
1241 use std::os::unix::fs::PermissionsExt;
1242 permissions.set_mode(0o600);
1243 file.set_permissions(permissions)?;
1244 }
1245 file.set_len(len as u64)?;
1246 let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
1248 Ok(IpcBytesMut {
1249 len,
1250 inner: IpcBytesMutInner::MemMap {
1251 name,
1252 map,
1253 write_handle: file,
1254 },
1255 })
1256 }
1257 #[cfg(not(ipc))]
1258 {
1259 Ok(IpcBytesMut {
1260 len,
1261 inner: IpcBytesMutInner::Heap(vec![0; len]),
1262 })
1263 }
1264 }
1265
1266 pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1268 #[cfg(ipc)]
1269 if buf.len() <= IpcBytes::INLINE_MAX {
1270 Ok(Self {
1271 len: buf.len(),
1272 inner: IpcBytesMutInner::Heap(buf),
1273 })
1274 } else {
1275 blocking::unblock(move || {
1276 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1277 b[..].copy_from_slice(&buf);
1278 Ok(b)
1279 })
1280 .await
1281 }
1282 #[cfg(not(ipc))]
1283 {
1284 Ok(Self {
1285 len: buf.len(),
1286 inner: IpcBytesMutInner::Heap(buf),
1287 })
1288 }
1289 }
1290
1291 pub async fn from_bytes(bytes: IpcBytes) -> io::Result<Self> {
1293 blocking::unblock(move || Self::from_bytes_blocking(bytes)).await
1294 }
1295
1296 pub async fn finish(mut self) -> io::Result<IpcBytes> {
1298 let len = self.len;
1299 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1300 IpcBytesMutInner::Heap(mut v) => {
1301 v.truncate(len);
1302 v.shrink_to_fit();
1303 IpcBytesData::Heap(v)
1304 }
1305 #[cfg(ipc)]
1306 IpcBytesMutInner::AnonMemMap(m) => {
1307 if len < IpcBytes::INLINE_MAX {
1308 IpcBytesData::Heap(m[..len].to_vec())
1309 } else if len < m.len() {
1310 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1311 } else {
1312 IpcBytesData::AnonMemMap(m)
1313 }
1314 }
1315 #[cfg(ipc)]
1316 IpcBytesMutInner::MemMap { name, map, write_handle } => {
1317 let len = self.len;
1318 blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1319 }
1320 };
1321 Ok(IpcBytes(Arc::new(data)))
1322 }
1323
1324 #[cfg(ipc)]
1325 fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1326 let alloc_len = map.len();
1327 if alloc_len != len {
1328 write_handle.set_len(len as u64)?;
1329 }
1330 write_handle.unlock()?;
1331 let map = if alloc_len != len {
1332 drop(map);
1333 unsafe { memmap2::Mmap::map(&write_handle) }?
1335 } else {
1336 map.make_read_only()?
1337 };
1338 let mut permissions = write_handle.metadata()?.permissions();
1339 permissions.set_readonly(true);
1340 #[cfg(unix)]
1341 {
1342 use std::os::unix::fs::PermissionsExt;
1343 permissions.set_mode(0o400);
1344 }
1345 write_handle.set_permissions(permissions)?;
1346 drop(write_handle);
1347 let read_handle = std::fs::File::open(&name)?;
1348 read_handle.lock_shared()?;
1349 Ok(IpcBytesData::MemMap(IpcMemMap {
1350 name,
1351 range: 0..len,
1352 is_custom: false,
1353 map: IpcMemMapData::Connected(map, read_handle),
1354 }))
1355 }
1356}
1357impl IpcBytesMut {
1358 pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1360 #[cfg(ipc)]
1361 if buf.len() <= IpcBytes::INLINE_MAX {
1362 Ok(Self {
1363 len: buf.len(),
1364 inner: IpcBytesMutInner::Heap(buf),
1365 })
1366 } else {
1367 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1368 b[..].copy_from_slice(&buf);
1369 Ok(b)
1370 }
1371 #[cfg(not(ipc))]
1372 {
1373 Ok(Self {
1374 len: buf.len(),
1375 inner: IpcBytesMutInner::Heap(buf),
1376 })
1377 }
1378 }
1379
1380 pub fn from_slice_blocking(buf: &[u8]) -> io::Result<Self> {
1382 #[cfg(ipc)]
1383 if buf.len() <= IpcBytes::INLINE_MAX {
1384 Ok(Self {
1385 len: buf.len(),
1386 inner: IpcBytesMutInner::Heap(buf.to_vec()),
1387 })
1388 } else {
1389 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1390 b[..].copy_from_slice(buf);
1391 Ok(b)
1392 }
1393 #[cfg(not(ipc))]
1394 {
1395 Ok(Self {
1396 len: buf.len(),
1397 inner: IpcBytesMutInner::Heap(buf.to_vec()),
1398 })
1399 }
1400 }
1401
1402 pub fn from_bytes_blocking(bytes: IpcBytes) -> io::Result<Self> {
1404 #[cfg_attr(not(ipc), allow(irrefutable_let_patterns))]
1405 if let IpcBytesData::Heap(_) = &*bytes.0 {
1406 match Arc::try_unwrap(bytes.0) {
1407 Ok(r) => match r {
1408 IpcBytesData::Heap(r) => Ok(Self {
1409 len: r.len(),
1410 inner: IpcBytesMutInner::Heap(r),
1411 }),
1412 _ => unreachable!(),
1413 },
1414 Err(a) => Self::from_slice_blocking(&IpcBytes(a)[..]),
1415 }
1416 } else {
1417 Self::from_slice_blocking(&bytes[..])
1418 }
1419 }
1420
1421 pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1423 let len = self.len;
1424 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1425 IpcBytesMutInner::Heap(mut v) => {
1426 v.truncate(len);
1427 IpcBytesData::Heap(v)
1428 }
1429 #[cfg(ipc)]
1430 IpcBytesMutInner::AnonMemMap(m) => {
1431 if len < IpcBytes::INLINE_MAX {
1432 IpcBytesData::Heap(m[..len].to_vec())
1433 } else if len < m.len() {
1434 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1435 } else {
1436 IpcBytesData::AnonMemMap(m)
1437 }
1438 }
1439 #[cfg(ipc)]
1440 IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1441 };
1442 Ok(IpcBytes(Arc::new(data)))
1443 }
1444}
1445#[cfg(ipc)]
1446impl Drop for IpcBytesMut {
1447 fn drop(&mut self) {
1448 if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1449 drop(map);
1450 drop(write_handle);
1451 std::fs::remove_file(name).ok();
1452 }
1453 }
1454}
1455
1456pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1460 bytes: IpcBytesMut,
1461 _t: PhantomData<T>,
1462}
1463impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1464 type Target = [T];
1465
1466 fn deref(&self) -> &Self::Target {
1467 bytemuck::cast_slice::<u8, T>(&self.bytes)
1468 }
1469}
1470impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1471 fn deref_mut(&mut self) -> &mut Self::Target {
1472 bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1473 }
1474}
1475impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1476 pub fn into_inner(self) -> IpcBytesMut {
1478 self.bytes
1479 }
1480}
1481impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1482 fn from(value: IpcBytesMutCast<T>) -> Self {
1483 value.bytes
1484 }
1485}
1486impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
1487 pub async fn new(len: usize) -> io::Result<Self> {
1489 IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1490 }
1491
1492 pub fn new_blocking(len: usize) -> io::Result<Self> {
1494 IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1495 }
1496
1497 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1499 IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
1500 }
1501
1502 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1504 IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
1505 }
1506
1507 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1509 IpcBytesMut::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytesMut::cast)
1510 }
1511
1512 pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
1514 &mut self.bytes
1515 }
1516
1517 pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
1519 self.bytes.finish().await.map(IpcBytes::cast)
1520 }
1521
1522 pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
1524 self.bytes.finish_blocking().map(IpcBytes::cast)
1525 }
1526}
1527
1528impl IpcBytesMut {
1529 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1539 let r = IpcBytesMutCast {
1540 bytes: self,
1541 _t: PhantomData,
1542 };
1543 let _assert = &r[..];
1544 r
1545 }
1546
1547 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1555 bytemuck::cast_slice(self)
1556 }
1557
1558 pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1566 bytemuck::cast_slice_mut(self)
1567 }
1568}
1569
1570pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1574 bytes: IpcBytes,
1575 _t: PhantomData<T>,
1576}
1577impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
1578 fn default() -> Self {
1579 Self {
1580 bytes: Default::default(),
1581 _t: PhantomData,
1582 }
1583 }
1584}
1585impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1586 type Target = [T];
1587
1588 fn deref(&self) -> &Self::Target {
1589 bytemuck::cast_slice::<u8, T>(&self.bytes)
1590 }
1591}
1592impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1593 pub fn into_inner(self) -> IpcBytes {
1595 self.bytes
1596 }
1597}
1598impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1599 fn from(value: IpcBytesCast<T>) -> Self {
1600 value.bytes
1601 }
1602}
1603impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
1604 fn clone(&self) -> Self {
1605 Self {
1606 bytes: self.bytes.clone(),
1607 _t: PhantomData,
1608 }
1609 }
1610}
1611impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
1612 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1613 write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
1614 }
1615}
1616impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
1617 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1618 where
1619 S: serde::Serializer,
1620 {
1621 self.bytes.serialize(serializer)
1622 }
1623}
1624impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
1625 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1626 where
1627 D: serde::Deserializer<'de>,
1628 {
1629 let bytes = IpcBytes::deserialize(deserializer)?;
1630 Ok(bytes.cast())
1631 }
1632}
1633impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
1634 fn eq(&self, other: &Self) -> bool {
1635 self.bytes == other.bytes
1636 }
1637}
1638impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
1639impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
1640 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1642 IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1643 }
1644
1645 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1647 IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1648 }
1649
1650 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1652 IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
1653 }
1654
1655 pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
1665 #[cfg(ipc)]
1666 {
1667 let (min, max) = iter.size_hint();
1668 let l = size_of::<T>();
1669 let min = min * l;
1670 let max = max.map(|m| m * l);
1671 if let Some(max) = max {
1672 if max <= IpcBytes::INLINE_MAX {
1673 return Self::from_vec(iter.collect()).await;
1674 } else if max == min {
1675 let mut r = IpcBytes::new_mut(max).await?;
1676 let mut actual_len = 0;
1677 for (i, f) in r.chunks_exact_mut(l).zip(iter) {
1678 i.copy_from_slice(bytemuck::bytes_of(&f));
1679 actual_len += 1;
1680 }
1681 r.truncate(actual_len * l);
1682 return r.finish().await.map(IpcBytes::cast);
1683 }
1684 }
1685
1686 let mut writer = IpcBytes::new_writer().await;
1687 for f in iter {
1688 writer.write_all(bytemuck::bytes_of(&f)).await?;
1689 }
1690 writer.finish().await.map(IpcBytes::cast)
1691 }
1692 #[cfg(not(ipc))]
1693 {
1694 Self::from_vec(iter.collect()).await
1695 }
1696 }
1697
1698 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1700 IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
1701 }
1702
1703 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1705 IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
1706 }
1707
1708 pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
1718 #[cfg(ipc)]
1719 {
1720 let (min, max) = iter.size_hint();
1721 let l = size_of::<T>();
1722 let min = min * l;
1723 let max = max.map(|m| m * l);
1724 if let Some(max) = max {
1725 if max <= IpcBytes::INLINE_MAX {
1726 return Self::from_vec_blocking(iter.collect());
1727 } else if max == min {
1728 let mut r = IpcBytes::new_mut_blocking(max)?;
1729 let mut actual_len = 0;
1730 for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
1731 i.copy_from_slice(bytemuck::bytes_of(&f));
1732 actual_len += 1;
1733 }
1734 r.truncate(actual_len * l);
1735 return r.finish_blocking().map(IpcBytes::cast);
1736 }
1737 }
1738
1739 let mut writer = IpcBytes::new_writer_blocking();
1740 for f in iter {
1741 writer.write_all(bytemuck::bytes_of(&f))?;
1742 }
1743 writer.finish().map(IpcBytes::cast)
1744 }
1745 #[cfg(not(ipc))]
1746 {
1747 Self::from_vec_blocking(iter.collect())
1748 }
1749 }
1750
1751 pub fn as_bytes(&self) -> &IpcBytes {
1753 &self.bytes
1754 }
1755}
1756
1757impl IpcBytes {
1758 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1768 let r = IpcBytesCast {
1769 bytes: self,
1770 _t: PhantomData,
1771 };
1772 let _assert = &r[..];
1773 r
1774 }
1775
1776 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1784 bytemuck::cast_slice(self)
1785 }
1786}
1787
1788impl IpcBytesMut {
1789 pub fn truncate(&mut self, new_len: usize) {
1795 self.len = self.len.min(new_len);
1796 }
1797
1798 pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1808 assert!(L1 <= L0);
1809
1810 let self_ = &mut self[..];
1811
1812 let len = self_.len();
1813 if len == 0 {
1814 return;
1815 }
1816 assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1817
1818 let ptr = self_.as_mut_ptr();
1819 let mut write = 0usize;
1820 let mut read = 0usize;
1821
1822 unsafe {
1824 while read < len {
1825 let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1826 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1827 read += L0;
1828
1829 let out_chunk = reduce(in_chunk.assume_init());
1830
1831 std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1832 write += L1;
1833 }
1834 }
1835
1836 self.truncate(write);
1837 }
1838
1839 pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1849 assert!(out_chunk_buf.len() < in_chunk_len);
1850
1851 let self_ = &mut self[..];
1852
1853 let len = self_.len();
1854 if len == 0 {
1855 return;
1856 }
1857 assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1858
1859 let ptr = self_.as_mut_ptr();
1860 let mut write = 0usize;
1861 let mut read = 0usize;
1862
1863 unsafe {
1865 while read < len {
1866 reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1867 read += in_chunk_len;
1868
1869 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1870 write += out_chunk_buf.len();
1871 }
1872 }
1873
1874 self.truncate(write);
1875 }
1876
1877 pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1887 where
1888 T0: bytemuck::AnyBitPattern,
1889 {
1890 let l0 = std::mem::size_of::<T0>() * L0;
1891 let l1 = std::mem::size_of::<T1>() * L1;
1892 assert!(l1 <= l0);
1893
1894 let self_ = &mut self[..];
1895
1896 let len = self_.len();
1897 if len == 0 {
1898 return;
1899 }
1900 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1901
1902 let ptr = self_.as_mut_ptr();
1903 let mut write = 0usize;
1904 let mut read = 0usize;
1905
1906 unsafe {
1910 while read < len {
1911 let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1912 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1913 read += l0;
1914
1915 let out_chunk = reduce(in_chunk.assume_init());
1916
1917 std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1918 write += l1;
1919 }
1920 }
1921
1922 self.truncate(write);
1923 }
1924
1925 pub fn cast_reduce_in_place_dyn<T0, T1>(
1937 &mut self,
1938 in_chunk_len: usize,
1939 out_chunk_buf: &mut [T1],
1940 mut reduce: impl FnMut(&[T0], &mut [T1]),
1941 ) where
1942 T0: bytemuck::AnyBitPattern,
1943 {
1944 let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1945 let l1 = std::mem::size_of_val(out_chunk_buf);
1946
1947 assert!(l1 <= l0);
1948
1949 let self_ = &mut self[..];
1950
1951 let len = self_.len();
1952 if len == 0 {
1953 return;
1954 }
1955 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1956
1957 let ptr = self_.as_mut_ptr();
1958 let mut write = 0usize;
1959 let mut read = 0usize;
1960
1961 unsafe {
1963 while read < len {
1964 reduce(
1965 bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1966 &mut *out_chunk_buf,
1967 );
1968 read += l0;
1969
1970 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1971 write += l1;
1972 }
1973 }
1974
1975 self.truncate(write);
1976 }
1977
1978 pub fn reverse_chunks<const L: usize>(&mut self) {
1986 let self_ = &mut self[..];
1987
1988 let len = self_.len();
1989
1990 if len == 0 || L == 0 {
1991 return;
1992 }
1993
1994 if L == 1 {
1995 return self_.reverse();
1996 }
1997
1998 assert!(len.is_multiple_of(L), "length must be multiple of L");
1999
2000 unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
2002 }
2003
2004 pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
2010 let self_ = &mut self[..];
2011
2012 let len = self_.len();
2013
2014 if len == 0 || chunk_len == 0 {
2015 return;
2016 }
2017
2018 if chunk_len == 1 {
2019 return self_.reverse();
2020 }
2021
2022 assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
2023
2024 let mut a = 0;
2025 let mut b = len - chunk_len;
2026
2027 let ptr = self_.as_mut_ptr();
2028
2029 unsafe {
2031 while a < b {
2032 std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
2033 a += chunk_len;
2034 b -= chunk_len;
2035 }
2036 }
2037 }
2038}
2039
2040type SliceIter<'a> = std::slice::Iter<'a, u8>;
2044self_cell::self_cell! {
2045 struct IpcBytesIntoIterInner {
2046 owner: IpcBytes,
2047 #[covariant]
2048 dependent: SliceIter,
2049 }
2050}
2051
2052pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
2054impl IpcBytesIntoIter {
2055 fn new(bytes: IpcBytes) -> Self {
2056 Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
2057 }
2058
2059 pub fn source(&self) -> &IpcBytes {
2061 self.0.borrow_owner()
2062 }
2063
2064 pub fn rest(&self) -> &[u8] {
2066 self.0.borrow_dependent().as_slice()
2067 }
2068}
2069impl Iterator for IpcBytesIntoIter {
2070 type Item = u8;
2071
2072 fn next(&mut self) -> Option<u8> {
2073 self.0.with_dependent_mut(|_, d| d.next().copied())
2074 }
2075
2076 fn size_hint(&self) -> (usize, Option<usize>) {
2077 self.0.borrow_dependent().size_hint()
2078 }
2079
2080 fn count(self) -> usize
2081 where
2082 Self: Sized,
2083 {
2084 self.0.borrow_dependent().as_slice().len()
2085 }
2086
2087 fn nth(&mut self, n: usize) -> Option<u8> {
2088 self.0.with_dependent_mut(|_, d| d.nth(n).copied())
2089 }
2090
2091 fn last(mut self) -> Option<Self::Item>
2092 where
2093 Self: Sized,
2094 {
2095 self.next_back()
2096 }
2097}
2098impl DoubleEndedIterator for IpcBytesIntoIter {
2099 fn next_back(&mut self) -> Option<Self::Item> {
2100 self.0.with_dependent_mut(|_, d| d.next_back().copied())
2101 }
2102
2103 fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
2104 self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
2105 }
2106}
2107impl FusedIterator for IpcBytesIntoIter {}
2108impl Default for IpcBytesIntoIter {
2109 fn default() -> Self {
2110 IpcBytes::empty().into_iter()
2111 }
2112}
2113impl IntoIterator for IpcBytes {
2114 type Item = u8;
2115
2116 type IntoIter = IpcBytesIntoIter;
2117
2118 fn into_iter(self) -> Self::IntoIter {
2119 IpcBytesIntoIter::new(self)
2120 }
2121}
2122
2123pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
2125impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
2126 fn new(bytes: IpcBytesCast<T>) -> Self {
2127 Self(bytes.bytes.clone().into_iter(), bytes)
2128 }
2129
2130 pub fn source(&self) -> &IpcBytesCast<T> {
2132 &self.1
2133 }
2134
2135 pub fn rest(&self) -> &[T] {
2137 bytemuck::cast_slice(self.0.rest())
2138 }
2139}
2140impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
2141 type Item = T;
2142
2143 fn next(&mut self) -> Option<T> {
2144 let size = size_of::<T>();
2145 let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
2146 self.0.nth(size - 1);
2147 Some(r)
2148 }
2149
2150 fn size_hint(&self) -> (usize, Option<usize>) {
2151 let (mut min, mut max) = self.0.size_hint();
2152 min /= size_of::<T>();
2153 if let Some(max) = &mut max {
2154 *max /= size_of::<T>();
2155 }
2156 (min, max)
2157 }
2158
2159 fn nth(&mut self, n: usize) -> Option<T> {
2160 let size = size_of::<T>();
2161
2162 let byte_skip = n.checked_mul(size)?;
2163 let byte_end = byte_skip.checked_add(size)?;
2164
2165 let bytes = self.0.rest().get(byte_skip..byte_end)?;
2166 let r = *bytemuck::from_bytes(bytes);
2167
2168 self.0.nth(byte_end - 1);
2169
2170 Some(r)
2171 }
2172
2173 fn last(mut self) -> Option<Self::Item>
2174 where
2175 Self: Sized,
2176 {
2177 self.next_back()
2178 }
2179}
2180impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
2181 fn next_back(&mut self) -> Option<T> {
2182 let size = size_of::<T>();
2183
2184 let len = self.0.rest().len();
2185 if len < size {
2186 return None;
2187 }
2188
2189 let start = len - size;
2190 let bytes = &self.0.rest()[start..];
2191 let r = *bytemuck::from_bytes(bytes);
2192
2193 self.0.nth_back(size - 1);
2194
2195 Some(r)
2196 }
2197
2198 fn nth_back(&mut self, n: usize) -> Option<T> {
2199 let size = size_of::<T>();
2200
2201 let rev_byte_skip = n.checked_mul(size)?;
2202 let rev_byte_end = rev_byte_skip.checked_add(size)?;
2203 let len = self.0.rest().len();
2204
2205 if len < rev_byte_end {
2206 return None;
2207 }
2208
2209 let start = len - rev_byte_end;
2210 let end = len - rev_byte_skip;
2211
2212 let bytes = &self.0.rest()[start..end];
2213 let r = *bytemuck::from_bytes(bytes);
2214
2215 self.0.nth_back(rev_byte_end - 1);
2216
2217 Some(r)
2218 }
2219}
2220impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
2221impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
2222 fn default() -> Self {
2223 IpcBytes::empty().cast::<T>().into_iter()
2224 }
2225}
2226impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
2227 type Item = T;
2228
2229 type IntoIter = IpcBytesCastIntoIter<T>;
2230
2231 fn into_iter(self) -> Self::IntoIter {
2232 IpcBytesCastIntoIter::new(self)
2233 }
2234}