1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 cell::Cell,
5 fmt, fs,
6 io::{self, Read, Write},
7 iter::FusedIterator,
8 marker::PhantomData,
9 mem::MaybeUninit,
10 ops,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, Weak},
14};
15
16use futures_lite::{AsyncReadExt, AsyncWriteExt as _};
17#[cfg(ipc)]
18use ipc_channel::ipc::IpcSharedMemory;
19use parking_lot::Mutex;
20use serde::{Deserialize, Serialize, de::VariantAccess};
21use zng_app_context::RunOnDrop;
22
23#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(Arc<IpcBytesData>);
49enum IpcBytesData {
50 Heap(Vec<u8>),
51 #[cfg(ipc)]
52 AnonMemMap(IpcSharedMemory),
53 #[cfg(ipc)]
54 MemMap(IpcMemMap),
55}
56#[cfg(ipc)]
57struct IpcMemMap {
58 name: PathBuf,
59 range: ops::Range<usize>,
60 is_custom: bool,
61 map: Option<memmap2::Mmap>,
62 read_handle: Option<fs::File>,
63}
64impl fmt::Debug for IpcBytes {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 write!(f, "IpcBytes(<{} bytes>)", self.len())
67 }
68}
69impl ops::Deref for IpcBytes {
70 type Target = [u8];
71
72 fn deref(&self) -> &Self::Target {
73 match &*self.0 {
74 IpcBytesData::Heap(i) => i,
75 #[cfg(ipc)]
76 IpcBytesData::AnonMemMap(m) => m,
77 #[cfg(ipc)]
78 IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
79 }
80 }
81}
82
83impl IpcBytes {
84 pub fn empty() -> Self {
86 IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
87 }
88}
89impl IpcBytes {
91 pub async fn new_writer() -> IpcBytesWriter {
93 IpcBytesWriter {
94 inner: blocking::Unblock::new(Self::new_writer_blocking()),
95 }
96 }
97
98 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
100 IpcBytesMut::new(len).await
101 }
102
103 pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
105 blocking::unblock(move || Self::from_vec_blocking(data)).await
106 }
107
108 pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
118 #[cfg(ipc)]
119 {
120 let (min, max) = iter.size_hint();
121 if let Some(max) = max {
122 if max <= Self::INLINE_MAX {
123 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
124 } else if max == min {
125 let mut r = IpcBytes::new_mut(max).await?;
126 let mut actual_len = 0;
127 for (i, b) in r.iter_mut().zip(iter) {
128 *i = b;
129 actual_len += 1;
130 }
131 r.truncate(actual_len);
132 return r.finish().await;
133 }
134 }
135
136 let mut writer = Self::new_writer().await;
137 for b in iter {
138 writer.write_all(&[b]).await?;
139 }
140 writer.finish().await
141 }
142
143 #[cfg(not(ipc))]
144 {
145 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
146 }
147 }
148
149 pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
151 #[cfg(ipc)]
152 {
153 Self::from_read_ipc(data).await
154 }
155 #[cfg(not(ipc))]
156 {
157 let mut data = data;
158 let mut buf = vec![];
159 data.read_to_end(&mut buf).await;
160 Self::from_vec(buf).await
161 }
162 }
163 #[cfg(ipc)]
164 async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
165 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
166 let mut len = 0;
167
168 loop {
170 match data.read(&mut buf[len..]).await {
171 Ok(l) => {
172 if l == 0 {
173 buf.truncate(len);
175 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
176 } else {
177 len += l;
178 if len == Self::INLINE_MAX + 1 {
179 break;
181 }
182 }
183 }
184 Err(e) => match e.kind() {
185 io::ErrorKind::WouldBlock => continue,
186 _ => return Err(e),
187 },
188 }
189 }
190
191 buf.resize(Self::UNNAMED_MAX + 1, 0);
193 loop {
194 match data.read(&mut buf[len..]).await {
195 Ok(l) => {
196 if l == 0 {
197 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
199 } else {
200 len += l;
201 if len == Self::UNNAMED_MAX + 1 {
202 break;
204 }
205 }
206 }
207 Err(e) => match e.kind() {
208 io::ErrorKind::WouldBlock => continue,
209 _ => return Err(e),
210 },
211 }
212 }
213
214 Self::new_memmap(async |m| {
216 use futures_lite::AsyncWriteExt as _;
217
218 m.write_all(&buf).await?;
219 crate::io::copy(data, m).await?;
220 Ok(())
221 })
222 .await
223 }
224
225 pub async fn from_path(path: PathBuf) -> io::Result<Self> {
227 let file = crate::fs::File::open(path).await?;
228 Self::from_file(file).await
229 }
230 pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
232 #[cfg(ipc)]
233 {
234 let len = file.metadata().await?.len();
235 if len <= Self::UNNAMED_MAX as u64 {
236 let mut buf = vec![0u8; len as usize];
237 file.read_exact(&mut buf).await?;
238 Self::from_vec_blocking(buf)
239 } else {
240 Self::new_memmap(async move |m| {
241 crate::io::copy(&mut file, m).await?;
242 Ok(())
243 })
244 .await
245 }
246 }
247 #[cfg(not(ipc))]
248 {
249 let mut buf = vec![];
250 file.read_to_end(&mut buf).await?;
251 Self::from_vec_blocking(buf)
252 }
253 }
254
255 #[cfg(ipc)]
260 pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
261 let (name, file) = blocking::unblock(Self::create_memmap).await?;
262 let mut file = crate::fs::File::from(file);
263 write(&mut file).await?;
264
265 let mut permissions = file.metadata().await?.permissions();
266 permissions.set_readonly(true);
267 #[cfg(unix)]
268 {
269 use std::os::unix::fs::PermissionsExt;
270 permissions.set_mode(0o400);
271 }
272 file.set_permissions(permissions).await?;
273
274 blocking::unblock(move || {
275 drop(file);
276 let map = IpcMemMap::read(name, None)?;
277 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
278 })
279 .await
280 }
281
282 #[cfg(ipc)]
295 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
296 blocking::unblock(move || {
297 unsafe { Self::open_memmap_blocking(file, range) }
299 })
300 .await
301 }
302
303 pub fn ptr_eq(&self, other: &Self) -> bool {
305 let a = &self[..];
306 let b = &other[..];
307 (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
308 }
309
310 #[cfg(ipc)]
311 const INLINE_MAX: usize = 64 * 1024; #[cfg(ipc)]
313 const UNNAMED_MAX: usize = 128 * 1024 * 1024; }
315
316impl IpcBytes {
318 pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
320 IpcBytesWriterBlocking {
321 #[cfg(ipc)]
322 heap_buf: vec![],
323 #[cfg(ipc)]
324 memmap: None,
325
326 #[cfg(not(ipc))]
327 heap_buf: std::io::Cursor::new(vec![]),
328 }
329 }
330
331 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
333 IpcBytesMut::new_blocking(len)
334 }
335
336 pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
338 #[cfg(ipc)]
339 {
340 if data.len() <= Self::INLINE_MAX {
341 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
342 } else if data.len() <= Self::UNNAMED_MAX {
343 Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
344 } else {
345 Self::new_memmap_blocking(|m| m.write_all(data))
346 }
347 }
348 #[cfg(not(ipc))]
349 {
350 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
351 }
352 }
353
354 pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
356 #[cfg(ipc)]
357 {
358 if data.len() <= Self::INLINE_MAX {
359 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
360 } else {
361 Self::from_slice_blocking(&data)
362 }
363 }
364 #[cfg(not(ipc))]
365 {
366 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
367 }
368 }
369
370 pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
380 #[cfg(ipc)]
381 {
382 let (min, max) = iter.size_hint();
383 if let Some(max) = max {
384 if max <= Self::INLINE_MAX {
385 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
386 } else if max == min {
387 let mut r = IpcBytes::new_mut_blocking(max)?;
388 let mut actual_len = 0;
389 for (i, b) in r.iter_mut().zip(iter) {
390 *i = b;
391 actual_len += 1;
392 }
393 r.truncate(actual_len);
394 return r.finish_blocking();
395 }
396 }
397
398 let mut writer = Self::new_writer_blocking();
399 for b in iter {
400 writer.write_all(&[b])?;
401 }
402 writer.finish()
403 }
404 #[cfg(not(ipc))]
405 {
406 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
407 }
408 }
409
410 pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
412 #[cfg(ipc)]
413 {
414 Self::from_read_blocking_ipc(data)
415 }
416 #[cfg(not(ipc))]
417 {
418 let mut buf = vec![];
419 data.read_to_end(&mut buf)?;
420 Self::from_vec_blocking(buf)
421 }
422 }
423 #[cfg(ipc)]
424 fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
425 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
426 let mut len = 0;
427
428 loop {
430 match data.read(&mut buf[len..]) {
431 Ok(l) => {
432 if l == 0 {
433 buf.truncate(len);
435 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
436 } else {
437 len += l;
438 if len == Self::INLINE_MAX + 1 {
439 break;
441 }
442 }
443 }
444 Err(e) => match e.kind() {
445 io::ErrorKind::WouldBlock => continue,
446 _ => return Err(e),
447 },
448 }
449 }
450
451 buf.resize(Self::UNNAMED_MAX + 1, 0);
453 loop {
454 match data.read(&mut buf[len..]) {
455 Ok(l) => {
456 if l == 0 {
457 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
459 } else {
460 len += l;
461 if len == Self::UNNAMED_MAX + 1 {
462 break;
464 }
465 }
466 }
467 Err(e) => match e.kind() {
468 io::ErrorKind::WouldBlock => continue,
469 _ => return Err(e),
470 },
471 }
472 }
473
474 Self::new_memmap_blocking(|m| {
476 m.write_all(&buf)?;
477 io::copy(data, m)?;
478 Ok(())
479 })
480 }
481
482 pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
484 let file = fs::File::open(path)?;
485 Self::from_file_blocking(file)
486 }
487 pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
489 #[cfg(ipc)]
490 {
491 let len = file.metadata()?.len();
492 if len <= Self::UNNAMED_MAX as u64 {
493 let mut buf = vec![0u8; len as usize];
494 file.read_exact(&mut buf)?;
495 Self::from_vec_blocking(buf)
496 } else {
497 Self::new_memmap_blocking(|m| {
498 io::copy(&mut file, m)?;
499 Ok(())
500 })
501 }
502 }
503 #[cfg(not(ipc))]
504 {
505 let mut buf = vec![];
506 file.read_to_end(&mut buf)?;
507 Self::from_vec_blocking(buf)
508 }
509 }
510
511 #[cfg(ipc)]
516 pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
517 let (name, mut file) = Self::create_memmap()?;
518 write(&mut file)?;
519 let mut permissions = file.metadata()?.permissions();
520 permissions.set_readonly(true);
521 #[cfg(unix)]
522 {
523 use std::os::unix::fs::PermissionsExt;
524 permissions.set_mode(0o400);
525 }
526 file.set_permissions(permissions)?;
527
528 drop(file);
529 let map = IpcMemMap::read(name, None)?;
530 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
531 }
532 #[cfg(ipc)]
533 fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
534 static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
535 let mut count = MEMMAP_DIR.lock();
536
537 if *count == 0 {
538 zng_env::on_process_exit(|_| {
539 IpcBytes::cleanup_memmap_storage();
540 });
541 }
542
543 let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
544 fs::create_dir_all(&dir)?;
545 let mut name = dir.join(count.to_string());
546 if *count < usize::MAX {
547 *count += 1;
548 } else {
549 for i in 0..usize::MAX {
551 name = dir.join(i.to_string());
552 if !name.exists() {
553 break;
554 }
555 }
556 if name.exists() {
557 return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
558 }
559 };
560
561 let file = fs::OpenOptions::new()
563 .create(true)
564 .read(true)
565 .write(true)
566 .truncate(true)
567 .open(&name)?;
568 Ok((name, file))
569 }
570 #[cfg(ipc)]
571 fn cleanup_memmap_storage() {
572 if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
573 let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
574 for entry in entries {
575 if entry.is_dir() {
576 fs::remove_dir_all(entry).ok();
577 }
578 }
579 }
580 }
581
582 #[cfg(ipc)]
595 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
596 let read_handle = fs::File::open(&file)?;
597 read_handle.lock_shared()?;
598 let len = read_handle.metadata()?.len();
599 if let Some(range) = &range
600 && len < range.end as u64
601 {
602 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
603 }
604 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
606
607 let range = range.unwrap_or_else(|| 0..map.len());
608
609 Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
610 name: file,
611 range,
612 read_handle: Some(read_handle),
613 is_custom: true,
614 map: Some(map),
615 }))))
616 }
617}
618
619impl AsRef<[u8]> for IpcBytes {
620 fn as_ref(&self) -> &[u8] {
621 &self[..]
622 }
623}
624impl Default for IpcBytes {
625 fn default() -> Self {
626 Self::empty()
627 }
628}
629impl PartialEq for IpcBytes {
630 fn eq(&self, other: &Self) -> bool {
631 self.ptr_eq(other) || self[..] == other[..]
632 }
633}
634impl Eq for IpcBytes {}
635#[cfg(ipc)]
636impl IpcMemMap {
637 fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
638 let read_handle = fs::File::open(&name)?;
639 read_handle.lock_shared()?;
640 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
642
643 let range = range.unwrap_or_else(|| 0..map.len());
644
645 Ok(IpcMemMap {
646 name,
647 range,
648 is_custom: false,
649 read_handle: Some(read_handle),
650 map: Some(map),
651 })
652 }
653}
654#[cfg(ipc)]
655impl Serialize for IpcMemMap {
656 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
657 where
658 S: serde::Serializer,
659 {
660 (&self.name, self.range.clone()).serialize(serializer)
661 }
662}
663#[cfg(ipc)]
664impl<'de> Deserialize<'de> for IpcMemMap {
665 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
666 where
667 D: serde::Deserializer<'de>,
668 {
669 let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
670 IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
671 }
672}
673#[cfg(ipc)]
674impl Drop for IpcMemMap {
675 fn drop(&mut self) {
676 self.map.take();
677 self.read_handle.take();
678 if !self.is_custom {
679 std::fs::remove_file(&self.name).ok();
680 }
681 }
682}
683
684impl Serialize for IpcBytes {
685 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
686 where
687 S: serde::Serializer,
688 {
689 #[cfg(ipc)]
690 {
691 if is_ipc_serialization() {
692 match &*self.0 {
693 IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
694 IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
695 IpcBytesData::MemMap(b) => {
696 let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
699 .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
700
701 let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
702 let hold = self.clone();
703 crate::spawn_wait(move || {
704 if let Err(e) = recv.recv_blocking() {
705 tracing::error!("IpcBytes memmap completion signal not received, {e}")
706 }
707 drop(hold);
708 });
709 Ok(r)
710 }
711 }
712 } else {
713 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
714 }
715 }
716 #[cfg(not(ipc))]
717 {
718 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
719 }
720 }
721}
722impl<'de> Deserialize<'de> for IpcBytes {
723 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
724 where
725 D: serde::Deserializer<'de>,
726 {
727 #[derive(Deserialize)]
728 enum VariantId {
729 Heap,
730 #[cfg(ipc)]
731 AnonMemMap,
732 #[cfg(ipc)]
733 MemMap,
734 }
735
736 struct EnumVisitor;
737 impl<'de> serde::de::Visitor<'de> for EnumVisitor {
738 type Value = IpcBytes;
739
740 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
741 write!(f, "IpcBytes variant")
742 }
743
744 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
745 where
746 A: serde::de::EnumAccess<'de>,
747 {
748 let (variant, access) = data.variant::<VariantId>()?;
749 match variant {
750 VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
751 #[cfg(ipc)]
752 VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
753 #[cfg(ipc)]
754 VariantId::MemMap => {
755 let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
756 completion_sender.send_blocking(()).map_err(|e| {
757 serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
758 })?;
759 Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
760 }
761 }
762 }
763 }
764 struct ByteSliceVisitor;
765 impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
766 type Value = IpcBytes;
767
768 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
769 write!(f, "byte buffer")
770 }
771
772 fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
773 where
774 E: serde::de::Error,
775 {
776 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
777 }
778
779 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
780 where
781 E: serde::de::Error,
782 {
783 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
784 }
785
786 fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
787 where
788 E: serde::de::Error,
789 {
790 IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
791 }
792 }
793 impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
794 type Value = IpcBytes;
795
796 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
797 where
798 D: serde::Deserializer<'de>,
799 {
800 deserializer.deserialize_bytes(ByteSliceVisitor)
801 }
802 }
803
804 #[cfg(ipc)]
805 {
806 deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
807 }
808 #[cfg(not(ipc))]
809 {
810 deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
811 }
812 }
813}
814
815#[cfg(ipc)]
823pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
824 let parent = IPC_SERIALIZATION.replace(true);
825 let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
826 serialize()
827}
828
829#[cfg(ipc)]
831pub fn is_ipc_serialization() -> bool {
832 IPC_SERIALIZATION.get()
833}
834
835#[cfg(ipc)]
836thread_local! {
837 static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
838}
839
840impl IpcBytes {
841 pub fn downgrade(&self) -> WeakIpcBytes {
845 WeakIpcBytes(Arc::downgrade(&self.0))
846 }
847}
848
849pub struct WeakIpcBytes(Weak<IpcBytesData>);
851impl WeakIpcBytes {
852 pub fn upgrade(&self) -> Option<IpcBytes> {
854 self.0.upgrade().map(IpcBytes)
855 }
856
857 pub fn strong_count(&self) -> usize {
859 self.0.strong_count()
860 }
861}
862
863pub struct IpcBytesWriter {
867 inner: blocking::Unblock<IpcBytesWriterBlocking>,
868}
869impl IpcBytesWriter {
870 pub async fn finish(self) -> std::io::Result<IpcBytes> {
872 let inner = self.inner.into_inner().await;
873 blocking::unblock(move || inner.finish()).await
874 }
875
876 pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
878 let inner = self.inner.into_inner().await;
879 blocking::unblock(move || inner.finish_mut()).await
880 }
881}
882impl crate::io::AsyncWrite for IpcBytesWriter {
883 fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
884 crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
885 }
886
887 fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
888 crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
889 }
890
891 fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
892 crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
893 }
894}
895impl crate::io::AsyncSeek for IpcBytesWriter {
896 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
897 crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
898 }
899}
900
901pub struct IpcBytesWriterBlocking {
905 #[cfg(ipc)]
906 heap_buf: Vec<u8>,
907 #[cfg(ipc)]
908 memmap: Option<(PathBuf, std::fs::File)>,
909
910 #[cfg(not(ipc))]
911 heap_buf: std::io::Cursor<Vec<u8>>,
912}
913impl IpcBytesWriterBlocking {
914 pub fn finish(self) -> std::io::Result<IpcBytes> {
916 let m = self.finish_mut()?;
917 m.finish_blocking()
918 }
919
920 pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
922 self.flush()?;
923 #[cfg(ipc)]
924 {
925 let (len, inner) = match self.memmap {
926 Some((name, write_handle)) => {
927 let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
929 let len = map.len();
930 (len, IpcBytesMutInner::MemMap { name, map, write_handle })
931 }
932 None => {
933 let len = self.heap_buf.len();
934 let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
935 IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
936 } else {
937 IpcBytesMutInner::Heap(self.heap_buf)
938 };
939 (len, i)
940 }
941 };
942 Ok(IpcBytesMut { len, inner })
943 }
944 #[cfg(not(ipc))]
945 {
946 let heap_buf = self.heap_buf.into_inner();
947 let len = heap_buf.len();
948 let inner = IpcBytesMutInner::Heap(heap_buf);
949 Ok(IpcBytesMut { len, inner })
950 }
951 }
952
953 #[cfg(ipc)]
954 fn alloc_memmap_file(&mut self) -> io::Result<()> {
955 if self.memmap.is_none() {
956 let (name, file) = IpcBytes::create_memmap()?;
957 file.lock()?;
958 #[cfg(unix)]
959 {
960 let mut permissions = file.metadata()?.permissions();
961 use std::os::unix::fs::PermissionsExt;
962 permissions.set_mode(0o600);
963 file.set_permissions(permissions)?;
964 }
965 self.memmap = Some((name, file));
966 }
967 let file = &mut self.memmap.as_mut().unwrap().1;
968
969 file.write_all(&self.heap_buf)?;
970 self.heap_buf.clear();
972 Ok(())
973 }
974}
975impl std::io::Write for IpcBytesWriterBlocking {
976 fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
977 #[cfg(ipc)]
978 {
979 if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
980 self.alloc_memmap_file()?;
982
983 if write_buf.len() > IpcBytes::UNNAMED_MAX {
984 self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
986 } else {
987 self.heap_buf.extend_from_slice(write_buf);
988 }
989 } else {
990 if self.memmap.is_none() {
991 self.heap_buf
993 .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
994 }
995 self.heap_buf.extend_from_slice(write_buf);
996 }
997
998 Ok(write_buf.len())
999 }
1000
1001 #[cfg(not(ipc))]
1002 {
1003 std::io::Write::write(&mut self.heap_buf, write_buf)
1004 }
1005 }
1006
1007 fn flush(&mut self) -> io::Result<()> {
1008 #[cfg(ipc)]
1009 if let Some((_, file)) = &mut self.memmap {
1010 if !self.heap_buf.is_empty() {
1011 file.write_all(&self.heap_buf)?;
1012 self.heap_buf.clear();
1013 }
1014 file.flush()?;
1015 }
1016 Ok(())
1017 }
1018}
1019impl std::io::Seek for IpcBytesWriterBlocking {
1020 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
1021 #[cfg(ipc)]
1022 {
1023 self.alloc_memmap_file()?;
1024 let (_, file) = self.memmap.as_mut().unwrap();
1025 if !self.heap_buf.is_empty() {
1026 file.write_all(&self.heap_buf)?;
1027 self.heap_buf.clear();
1028 }
1029 file.seek(pos)
1030 }
1031 #[cfg(not(ipc))]
1032 {
1033 std::io::Seek::seek(&mut self.heap_buf, pos)
1034 }
1035 }
1036}
1037
1038enum IpcBytesMutInner {
1039 Heap(Vec<u8>),
1040 #[cfg(ipc)]
1041 AnonMemMap(IpcSharedMemory),
1042 #[cfg(ipc)]
1043 MemMap {
1044 name: PathBuf,
1045 map: memmap2::MmapMut,
1046 write_handle: std::fs::File,
1047 },
1048}
1049
1050pub struct IpcBytesMut {
1054 inner: IpcBytesMutInner,
1055 len: usize,
1056}
1057impl ops::Deref for IpcBytesMut {
1058 type Target = [u8];
1059
1060 fn deref(&self) -> &Self::Target {
1061 let len = self.len;
1062 match &self.inner {
1063 IpcBytesMutInner::Heap(v) => &v[..len],
1064 #[cfg(ipc)]
1065 IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1066 #[cfg(ipc)]
1067 IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1068 }
1069 }
1070}
1071impl ops::DerefMut for IpcBytesMut {
1072 fn deref_mut(&mut self) -> &mut Self::Target {
1073 let len = self.len;
1074 match &mut self.inner {
1075 IpcBytesMutInner::Heap(v) => &mut v[..len],
1076 #[cfg(ipc)]
1077 IpcBytesMutInner::AnonMemMap(m) => {
1078 unsafe { m.deref_mut() }
1080 }
1081 #[cfg(ipc)]
1082 IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1083 }
1084 }
1085}
1086impl fmt::Debug for IpcBytesMut {
1087 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088 write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1089 }
1090}
1091impl IpcBytesMut {
1092 pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
1094 #[cfg(ipc)]
1095 if len <= IpcBytes::INLINE_MAX {
1096 Ok(IpcBytesMut {
1097 len,
1098 inner: IpcBytesMutInner::Heap(vec![0; len]),
1099 })
1100 } else if len <= IpcBytes::UNNAMED_MAX {
1101 Ok(IpcBytesMut {
1102 len,
1103 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1104 })
1105 } else {
1106 blocking::unblock(move || Self::new_blocking(len)).await
1107 }
1108
1109 #[cfg(not(ipc))]
1110 {
1111 Ok(IpcBytesMut {
1112 len,
1113 inner: IpcBytesMutInner::Heap(vec![0; len]),
1114 })
1115 }
1116 }
1117
1118 pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
1120 #[cfg(ipc)]
1121 if len <= IpcBytes::INLINE_MAX {
1122 Ok(IpcBytesMut {
1123 len,
1124 inner: IpcBytesMutInner::Heap(vec![0; len]),
1125 })
1126 } else if len <= IpcBytes::UNNAMED_MAX {
1127 Ok(IpcBytesMut {
1128 len,
1129 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1130 })
1131 } else {
1132 let (name, file) = IpcBytes::create_memmap()?;
1133 file.lock()?;
1134 #[cfg(unix)]
1135 {
1136 let mut permissions = file.metadata()?.permissions();
1137 use std::os::unix::fs::PermissionsExt;
1138 permissions.set_mode(0o600);
1139 file.set_permissions(permissions)?;
1140 }
1141 file.set_len(len as u64)?;
1142 let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
1144 Ok(IpcBytesMut {
1145 len,
1146 inner: IpcBytesMutInner::MemMap {
1147 name,
1148 map,
1149 write_handle: file,
1150 },
1151 })
1152 }
1153 #[cfg(not(ipc))]
1154 {
1155 Ok(IpcBytesMut {
1156 len,
1157 inner: IpcBytesMutInner::Heap(vec![0; len]),
1158 })
1159 }
1160 }
1161
1162 pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1164 #[cfg(ipc)]
1165 if buf.len() <= IpcBytes::INLINE_MAX {
1166 Ok(Self {
1167 len: buf.len(),
1168 inner: IpcBytesMutInner::Heap(buf),
1169 })
1170 } else {
1171 blocking::unblock(move || {
1172 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1173 b[..].copy_from_slice(&buf);
1174 Ok(b)
1175 })
1176 .await
1177 }
1178 #[cfg(not(ipc))]
1179 {
1180 Ok(Self {
1181 len: buf.len(),
1182 inner: IpcBytesMutInner::Heap(buf),
1183 })
1184 }
1185 }
1186
1187 pub async fn finish(mut self) -> io::Result<IpcBytes> {
1189 let len = self.len;
1190 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1191 IpcBytesMutInner::Heap(mut v) => {
1192 v.truncate(len);
1193 v.shrink_to_fit();
1194 IpcBytesData::Heap(v)
1195 }
1196 #[cfg(ipc)]
1197 IpcBytesMutInner::AnonMemMap(m) => {
1198 if len < IpcBytes::INLINE_MAX {
1199 IpcBytesData::Heap(m[..len].to_vec())
1200 } else if len < m.len() {
1201 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1202 } else {
1203 IpcBytesData::AnonMemMap(m)
1204 }
1205 }
1206 #[cfg(ipc)]
1207 IpcBytesMutInner::MemMap { name, map, write_handle } => {
1208 let len = self.len;
1209 blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1210 }
1211 };
1212 Ok(IpcBytes(Arc::new(data)))
1213 }
1214
1215 #[cfg(ipc)]
1216 fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1217 let alloc_len = map.len();
1218 if alloc_len != len {
1219 write_handle.set_len(len as u64)?;
1220 }
1221 write_handle.unlock()?;
1222 let map = if alloc_len != len {
1223 drop(map);
1224 unsafe { memmap2::Mmap::map(&write_handle) }?
1226 } else {
1227 map.make_read_only()?
1228 };
1229 let mut permissions = write_handle.metadata()?.permissions();
1230 permissions.set_readonly(true);
1231 #[cfg(unix)]
1232 {
1233 use std::os::unix::fs::PermissionsExt;
1234 permissions.set_mode(0o400);
1235 }
1236 write_handle.set_permissions(permissions)?;
1237 drop(write_handle);
1238 let read_handle = std::fs::File::open(&name)?;
1239 read_handle.lock_shared()?;
1240 Ok(IpcBytesData::MemMap(IpcMemMap {
1241 name,
1242 range: 0..len,
1243 is_custom: false,
1244 map: Some(map),
1245 read_handle: Some(read_handle),
1246 }))
1247 }
1248}
1249impl IpcBytesMut {
1250 pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1252 #[cfg(ipc)]
1253 if buf.len() <= IpcBytes::INLINE_MAX {
1254 Ok(Self {
1255 len: buf.len(),
1256 inner: IpcBytesMutInner::Heap(buf),
1257 })
1258 } else {
1259 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1260 b[..].copy_from_slice(&buf);
1261 Ok(b)
1262 }
1263 #[cfg(not(ipc))]
1264 {
1265 Ok(Self {
1266 len: buf.len(),
1267 inner: IpcBytesMutInner::Heap(buf),
1268 })
1269 }
1270 }
1271
1272 pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1274 let len = self.len;
1275 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1276 IpcBytesMutInner::Heap(mut v) => {
1277 v.truncate(len);
1278 IpcBytesData::Heap(v)
1279 }
1280 #[cfg(ipc)]
1281 IpcBytesMutInner::AnonMemMap(m) => {
1282 if len < IpcBytes::INLINE_MAX {
1283 IpcBytesData::Heap(m[..len].to_vec())
1284 } else if len < m.len() {
1285 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1286 } else {
1287 IpcBytesData::AnonMemMap(m)
1288 }
1289 }
1290 #[cfg(ipc)]
1291 IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1292 };
1293 Ok(IpcBytes(Arc::new(data)))
1294 }
1295}
1296#[cfg(ipc)]
1297impl Drop for IpcBytesMut {
1298 fn drop(&mut self) {
1299 if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1300 drop(map);
1301 drop(write_handle);
1302 std::fs::remove_file(name).ok();
1303 }
1304 }
1305}
1306
1307pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1311 bytes: IpcBytesMut,
1312 _t: PhantomData<T>,
1313}
1314impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1315 type Target = [T];
1316
1317 fn deref(&self) -> &Self::Target {
1318 bytemuck::cast_slice::<u8, T>(&self.bytes)
1319 }
1320}
1321impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1322 fn deref_mut(&mut self) -> &mut Self::Target {
1323 bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1324 }
1325}
1326impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1327 pub fn into_inner(self) -> IpcBytesMut {
1329 self.bytes
1330 }
1331}
1332impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1333 fn from(value: IpcBytesMutCast<T>) -> Self {
1334 value.bytes
1335 }
1336}
1337impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
1338 pub async fn new(len: usize) -> io::Result<Self> {
1340 IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1341 }
1342
1343 pub fn new_blocking(len: usize) -> io::Result<Self> {
1345 IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1346 }
1347
1348 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1350 IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
1351 }
1352
1353 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1355 IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
1356 }
1357
1358 pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
1360 &mut self.bytes
1361 }
1362
1363 pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
1365 self.bytes.finish().await.map(IpcBytes::cast)
1366 }
1367
1368 pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
1370 self.bytes.finish_blocking().map(IpcBytes::cast)
1371 }
1372}
1373
1374impl IpcBytesMut {
1375 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1385 let r = IpcBytesMutCast {
1386 bytes: self,
1387 _t: PhantomData,
1388 };
1389 let _assert = &r[..];
1390 r
1391 }
1392
1393 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1401 bytemuck::cast_slice(self)
1402 }
1403
1404 pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1412 bytemuck::cast_slice_mut(self)
1413 }
1414}
1415
1416pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1420 bytes: IpcBytes,
1421 _t: PhantomData<T>,
1422}
1423impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
1424 fn default() -> Self {
1425 Self {
1426 bytes: Default::default(),
1427 _t: PhantomData,
1428 }
1429 }
1430}
1431impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1432 type Target = [T];
1433
1434 fn deref(&self) -> &Self::Target {
1435 bytemuck::cast_slice::<u8, T>(&self.bytes)
1436 }
1437}
1438impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1439 pub fn into_inner(self) -> IpcBytes {
1441 self.bytes
1442 }
1443}
1444impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1445 fn from(value: IpcBytesCast<T>) -> Self {
1446 value.bytes
1447 }
1448}
1449impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
1450 fn clone(&self) -> Self {
1451 Self {
1452 bytes: self.bytes.clone(),
1453 _t: PhantomData,
1454 }
1455 }
1456}
1457impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
1458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1459 write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
1460 }
1461}
1462impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
1463 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1464 where
1465 S: serde::Serializer,
1466 {
1467 self.bytes.serialize(serializer)
1468 }
1469}
1470impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
1471 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1472 where
1473 D: serde::Deserializer<'de>,
1474 {
1475 let bytes = IpcBytes::deserialize(deserializer)?;
1476 Ok(bytes.cast())
1477 }
1478}
1479impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
1480 fn eq(&self, other: &Self) -> bool {
1481 self.bytes == other.bytes
1482 }
1483}
1484impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
1485impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
1486 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1488 IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1489 }
1490
1491 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1493 IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1494 }
1495
1496 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1498 IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
1499 }
1500
1501 pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
1511 #[cfg(ipc)]
1512 {
1513 let (min, max) = iter.size_hint();
1514 let l = size_of::<T>();
1515 let min = min * l;
1516 let max = max.map(|m| m * l);
1517 if let Some(max) = max {
1518 if max <= IpcBytes::INLINE_MAX {
1519 return Self::from_vec(iter.collect()).await;
1520 } else if max == min {
1521 let mut r = IpcBytes::new_mut(max).await?;
1522 let mut actual_len = 0;
1523 for (i, f) in r.chunks_exact_mut(l).zip(iter) {
1524 i.copy_from_slice(bytemuck::bytes_of(&f));
1525 actual_len += 1;
1526 }
1527 r.truncate(actual_len * l);
1528 return r.finish().await.map(IpcBytes::cast);
1529 }
1530 }
1531
1532 let mut writer = IpcBytes::new_writer().await;
1533 for f in iter {
1534 writer.write_all(bytemuck::bytes_of(&f)).await?;
1535 }
1536 writer.finish().await.map(IpcBytes::cast)
1537 }
1538 #[cfg(not(ipc))]
1539 {
1540 Self::from_vec(iter.collect()).await
1541 }
1542 }
1543
1544 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1546 IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
1547 }
1548
1549 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1551 IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
1552 }
1553
1554 pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
1564 #[cfg(ipc)]
1565 {
1566 let (min, max) = iter.size_hint();
1567 let l = size_of::<T>();
1568 let min = min * l;
1569 let max = max.map(|m| m * l);
1570 if let Some(max) = max {
1571 if max <= IpcBytes::INLINE_MAX {
1572 return Self::from_vec_blocking(iter.collect());
1573 } else if max == min {
1574 let mut r = IpcBytes::new_mut_blocking(max)?;
1575 let mut actual_len = 0;
1576 for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
1577 i.copy_from_slice(bytemuck::bytes_of(&f));
1578 actual_len += 1;
1579 }
1580 r.truncate(actual_len * l);
1581 return r.finish_blocking().map(IpcBytes::cast);
1582 }
1583 }
1584
1585 let mut writer = IpcBytes::new_writer_blocking();
1586 for f in iter {
1587 writer.write_all(bytemuck::bytes_of(&f))?;
1588 }
1589 writer.finish().map(IpcBytes::cast)
1590 }
1591 #[cfg(not(ipc))]
1592 {
1593 Self::from_vec_blocking(iter.collect())
1594 }
1595 }
1596
1597 pub fn as_bytes(&self) -> &IpcBytes {
1599 &self.bytes
1600 }
1601}
1602
1603impl IpcBytes {
1604 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1614 let r = IpcBytesCast {
1615 bytes: self,
1616 _t: PhantomData,
1617 };
1618 let _assert = &r[..];
1619 r
1620 }
1621
1622 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1630 bytemuck::cast_slice(self)
1631 }
1632}
1633
1634impl IpcBytesMut {
1635 pub fn truncate(&mut self, new_len: usize) {
1641 self.len = self.len.min(new_len);
1642 }
1643
1644 pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1654 assert!(L1 <= L0);
1655
1656 let self_ = &mut self[..];
1657
1658 let len = self_.len();
1659 if len == 0 {
1660 return;
1661 }
1662 assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1663
1664 let ptr = self_.as_mut_ptr();
1665 let mut write = 0usize;
1666 let mut read = 0usize;
1667
1668 unsafe {
1670 while read < len {
1671 let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1672 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1673 read += L0;
1674
1675 let out_chunk = reduce(in_chunk.assume_init());
1676
1677 std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1678 write += L1;
1679 }
1680 }
1681
1682 self.truncate(write);
1683 }
1684
1685 pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1695 assert!(out_chunk_buf.len() < in_chunk_len);
1696
1697 let self_ = &mut self[..];
1698
1699 let len = self_.len();
1700 if len == 0 {
1701 return;
1702 }
1703 assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1704
1705 let ptr = self_.as_mut_ptr();
1706 let mut write = 0usize;
1707 let mut read = 0usize;
1708
1709 unsafe {
1711 while read < len {
1712 reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1713 read += in_chunk_len;
1714
1715 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1716 write += out_chunk_buf.len();
1717 }
1718 }
1719
1720 self.truncate(write);
1721 }
1722
1723 pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1733 where
1734 T0: bytemuck::AnyBitPattern,
1735 {
1736 let l0 = std::mem::size_of::<T0>() * L0;
1737 let l1 = std::mem::size_of::<T1>() * L1;
1738 assert!(l1 <= l0);
1739
1740 let self_ = &mut self[..];
1741
1742 let len = self_.len();
1743 if len == 0 {
1744 return;
1745 }
1746 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1747
1748 let ptr = self_.as_mut_ptr();
1749 let mut write = 0usize;
1750 let mut read = 0usize;
1751
1752 unsafe {
1756 while read < len {
1757 let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1758 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1759 read += l0;
1760
1761 let out_chunk = reduce(in_chunk.assume_init());
1762
1763 std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1764 write += l1;
1765 }
1766 }
1767
1768 self.truncate(write);
1769 }
1770
1771 pub fn cast_reduce_in_place_dyn<T0, T1>(
1783 &mut self,
1784 in_chunk_len: usize,
1785 out_chunk_buf: &mut [T1],
1786 mut reduce: impl FnMut(&[T0], &mut [T1]),
1787 ) where
1788 T0: bytemuck::AnyBitPattern,
1789 {
1790 let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1791 let l1 = std::mem::size_of_val(out_chunk_buf);
1792
1793 assert!(l1 <= l0);
1794
1795 let self_ = &mut self[..];
1796
1797 let len = self_.len();
1798 if len == 0 {
1799 return;
1800 }
1801 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1802
1803 let ptr = self_.as_mut_ptr();
1804 let mut write = 0usize;
1805 let mut read = 0usize;
1806
1807 unsafe {
1809 while read < len {
1810 reduce(
1811 bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1812 &mut *out_chunk_buf,
1813 );
1814 read += l0;
1815
1816 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1817 write += l1;
1818 }
1819 }
1820
1821 self.truncate(write);
1822 }
1823
1824 pub fn reverse_chunks<const L: usize>(&mut self) {
1832 let self_ = &mut self[..];
1833
1834 let len = self_.len();
1835
1836 if len == 0 || L == 0 {
1837 return;
1838 }
1839
1840 if L == 1 {
1841 return self_.reverse();
1842 }
1843
1844 assert!(len.is_multiple_of(L), "length must be multiple of L");
1845
1846 unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
1848 }
1849
1850 pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
1856 let self_ = &mut self[..];
1857
1858 let len = self_.len();
1859
1860 if len == 0 || chunk_len == 0 {
1861 return;
1862 }
1863
1864 if chunk_len == 1 {
1865 return self_.reverse();
1866 }
1867
1868 assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
1869
1870 let mut a = 0;
1871 let mut b = len - chunk_len;
1872
1873 let ptr = self_.as_mut_ptr();
1874
1875 unsafe {
1877 while a < b {
1878 std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
1879 a += chunk_len;
1880 b -= chunk_len;
1881 }
1882 }
1883 }
1884}
1885
1886type SliceIter<'a> = std::slice::Iter<'a, u8>;
1890self_cell::self_cell! {
1891 struct IpcBytesIntoIterInner {
1892 owner: IpcBytes,
1893 #[covariant]
1894 dependent: SliceIter,
1895 }
1896}
1897
1898pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
1900impl IpcBytesIntoIter {
1901 fn new(bytes: IpcBytes) -> Self {
1902 Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
1903 }
1904
1905 pub fn source(&self) -> &IpcBytes {
1907 self.0.borrow_owner()
1908 }
1909
1910 pub fn rest(&self) -> &[u8] {
1912 self.0.borrow_dependent().as_slice()
1913 }
1914}
1915impl Iterator for IpcBytesIntoIter {
1916 type Item = u8;
1917
1918 fn next(&mut self) -> Option<u8> {
1919 self.0.with_dependent_mut(|_, d| d.next().copied())
1920 }
1921
1922 fn size_hint(&self) -> (usize, Option<usize>) {
1923 self.0.borrow_dependent().size_hint()
1924 }
1925
1926 fn count(self) -> usize
1927 where
1928 Self: Sized,
1929 {
1930 self.0.borrow_dependent().as_slice().len()
1931 }
1932
1933 fn nth(&mut self, n: usize) -> Option<u8> {
1934 self.0.with_dependent_mut(|_, d| d.nth(n).copied())
1935 }
1936
1937 fn last(mut self) -> Option<Self::Item>
1938 where
1939 Self: Sized,
1940 {
1941 self.next_back()
1942 }
1943}
1944impl DoubleEndedIterator for IpcBytesIntoIter {
1945 fn next_back(&mut self) -> Option<Self::Item> {
1946 self.0.with_dependent_mut(|_, d| d.next_back().copied())
1947 }
1948
1949 fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
1950 self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
1951 }
1952}
1953impl FusedIterator for IpcBytesIntoIter {}
1954impl Default for IpcBytesIntoIter {
1955 fn default() -> Self {
1956 IpcBytes::empty().into_iter()
1957 }
1958}
1959impl IntoIterator for IpcBytes {
1960 type Item = u8;
1961
1962 type IntoIter = IpcBytesIntoIter;
1963
1964 fn into_iter(self) -> Self::IntoIter {
1965 IpcBytesIntoIter::new(self)
1966 }
1967}
1968
1969pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
1971impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
1972 fn new(bytes: IpcBytesCast<T>) -> Self {
1973 Self(bytes.bytes.clone().into_iter(), bytes)
1974 }
1975
1976 pub fn source(&self) -> &IpcBytesCast<T> {
1978 &self.1
1979 }
1980
1981 pub fn rest(&self) -> &[T] {
1983 bytemuck::cast_slice(self.0.rest())
1984 }
1985}
1986impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
1987 type Item = T;
1988
1989 fn next(&mut self) -> Option<T> {
1990 let size = size_of::<T>();
1991 let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
1992 self.0.nth(size - 1);
1993 Some(r)
1994 }
1995
1996 fn size_hint(&self) -> (usize, Option<usize>) {
1997 let (mut min, mut max) = self.0.size_hint();
1998 min /= size_of::<T>();
1999 if let Some(max) = &mut max {
2000 *max /= size_of::<T>();
2001 }
2002 (min, max)
2003 }
2004
2005 fn nth(&mut self, n: usize) -> Option<T> {
2006 let size = size_of::<T>();
2007
2008 let byte_skip = n.checked_mul(size)?;
2009 let byte_end = byte_skip.checked_add(size)?;
2010
2011 let bytes = self.0.rest().get(byte_skip..byte_end)?;
2012 let r = *bytemuck::from_bytes(bytes);
2013
2014 self.0.nth(byte_end - 1);
2015
2016 Some(r)
2017 }
2018
2019 fn last(mut self) -> Option<Self::Item>
2020 where
2021 Self: Sized,
2022 {
2023 self.next_back()
2024 }
2025}
2026impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
2027 fn next_back(&mut self) -> Option<T> {
2028 let size = size_of::<T>();
2029
2030 let len = self.0.rest().len();
2031 if len < size {
2032 return None;
2033 }
2034
2035 let start = len - size;
2036 let bytes = &self.0.rest()[start..];
2037 let r = *bytemuck::from_bytes(bytes);
2038
2039 self.0.nth_back(size - 1);
2040
2041 Some(r)
2042 }
2043
2044 fn nth_back(&mut self, n: usize) -> Option<T> {
2045 let size = size_of::<T>();
2046
2047 let rev_byte_skip = n.checked_mul(size)?;
2048 let rev_byte_end = rev_byte_skip.checked_add(size)?;
2049 let len = self.0.rest().len();
2050
2051 if len < rev_byte_end {
2052 return None;
2053 }
2054
2055 let start = len - rev_byte_end;
2056 let end = len - rev_byte_skip;
2057
2058 let bytes = &self.0.rest()[start..end];
2059 let r = *bytemuck::from_bytes(bytes);
2060
2061 self.0.nth_back(rev_byte_end - 1);
2062
2063 Some(r)
2064 }
2065}
2066impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
2067impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
2068 fn default() -> Self {
2069 IpcBytes::empty().cast::<T>().into_iter()
2070 }
2071}
2072impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
2073 type Item = T;
2074
2075 type IntoIter = IpcBytesCastIntoIter<T>;
2076
2077 fn into_iter(self) -> Self::IntoIter {
2078 IpcBytesCastIntoIter::new(self)
2079 }
2080}