1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 cell::Cell,
5 fmt, fs,
6 io::{self, Read, Write},
7 marker::PhantomData,
8 mem::MaybeUninit,
9 ops,
10 path::{Path, PathBuf},
11 pin::Pin,
12 sync::{Arc, Weak},
13};
14
15use futures_lite::AsyncReadExt;
16#[cfg(ipc)]
17use ipc_channel::ipc::IpcSharedMemory;
18use parking_lot::Mutex;
19use serde::{Deserialize, Serialize, de::VariantAccess};
20use zng_app_context::RunOnDrop;
21
22#[derive(Clone)]
46#[repr(transparent)]
47pub struct IpcBytes(Arc<IpcBytesData>);
48enum IpcBytesData {
49 Heap(Vec<u8>),
50 #[cfg(ipc)]
51 AnonMemMap(IpcSharedMemory),
52 #[cfg(ipc)]
53 MemMap(IpcMemMap),
54}
55#[cfg(ipc)]
56struct IpcMemMap {
57 name: PathBuf,
58 range: ops::Range<usize>,
59 is_custom: bool,
60 map: Option<memmap2::Mmap>,
61 read_handle: Option<fs::File>,
62}
63impl fmt::Debug for IpcBytes {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 write!(f, "IpcBytes(<{} bytes>)", self.len())
66 }
67}
68impl ops::Deref for IpcBytes {
69 type Target = [u8];
70
71 fn deref(&self) -> &Self::Target {
72 match &*self.0 {
73 IpcBytesData::Heap(i) => i,
74 #[cfg(ipc)]
75 IpcBytesData::AnonMemMap(m) => m,
76 #[cfg(ipc)]
77 IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
78 }
79 }
80}
81
82impl IpcBytes {
83 pub fn empty() -> Self {
85 IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
86 }
87}
88impl IpcBytes {
90 pub async fn new_writer() -> IpcBytesWriter {
92 IpcBytesWriter {
93 inner: blocking::Unblock::new(Self::new_writer_blocking()),
94 }
95 }
96
97 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
99 #[cfg(ipc)]
100 if len <= Self::INLINE_MAX {
101 Ok(IpcBytesMut {
102 len,
103 inner: IpcBytesMutInner::Heap(vec![0; len]),
104 })
105 } else if len <= Self::UNNAMED_MAX {
106 Ok(IpcBytesMut {
107 len,
108 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
109 })
110 } else {
111 blocking::unblock(move || Self::new_mut_blocking(len)).await
112 }
113
114 #[cfg(not(ipc))]
115 {
116 Ok(IpcBytesMut {
117 len,
118 inner: IpcBytesMutInner::Heap(vec![0; len]),
119 })
120 }
121 }
122
123 pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
125 blocking::unblock(move || Self::from_vec_blocking(data)).await
126 }
127
128 pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
130 #[cfg(ipc)]
131 {
132 Self::from_read_ipc(data).await
133 }
134 #[cfg(not(ipc))]
135 {
136 let mut data = data;
137 let mut buf = vec![];
138 data.read_to_end(&mut buf).await;
139 Self::from_vec(buf).await
140 }
141 }
142 #[cfg(ipc)]
143 async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
144 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
145 let mut len = 0;
146
147 loop {
149 match data.read(&mut buf[len..]).await {
150 Ok(l) => {
151 if l == 0 {
152 buf.truncate(len);
154 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
155 } else {
156 len += l;
157 if len == Self::INLINE_MAX + 1 {
158 break;
160 }
161 }
162 }
163 Err(e) => match e.kind() {
164 io::ErrorKind::WouldBlock => continue,
165 _ => return Err(e),
166 },
167 }
168 }
169
170 buf.resize(Self::UNNAMED_MAX + 1, 0);
172 loop {
173 match data.read(&mut buf[len..]).await {
174 Ok(l) => {
175 if l == 0 {
176 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
178 } else {
179 len += l;
180 if len == Self::UNNAMED_MAX + 1 {
181 break;
183 }
184 }
185 }
186 Err(e) => match e.kind() {
187 io::ErrorKind::WouldBlock => continue,
188 _ => return Err(e),
189 },
190 }
191 }
192
193 Self::new_memmap(async |m| {
195 use futures_lite::AsyncWriteExt as _;
196
197 m.write_all(&buf).await?;
198 crate::io::copy(data, m).await?;
199 Ok(())
200 })
201 .await
202 }
203
204 pub async fn from_file(file: PathBuf) -> io::Result<Self> {
206 #[cfg(ipc)]
207 {
208 let mut file = crate::fs::File::open(file).await?;
209 let len = file.metadata().await?.len();
210 if len <= Self::UNNAMED_MAX as u64 {
211 let mut buf = vec![0u8; len as usize];
212 file.read_exact(&mut buf).await?;
213 Self::from_vec_blocking(buf)
214 } else {
215 Self::new_memmap(async move |m| {
216 crate::io::copy(&mut file, m).await?;
217 Ok(())
218 })
219 .await
220 }
221 }
222 #[cfg(not(ipc))]
223 {
224 let mut file = crate::fs::File::open(file).await?;
225 let mut buf = vec![];
226 file.read_to_end(&mut buf).await?;
227 Self::from_vec_blocking(buf)
228 }
229 }
230
231 #[cfg(ipc)]
236 pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
237 let (name, file) = blocking::unblock(Self::create_memmap).await?;
238 let mut file = crate::fs::File::from(file);
239 write(&mut file).await?;
240
241 let mut permissions = file.metadata().await?.permissions();
242 permissions.set_readonly(true);
243 #[cfg(unix)]
244 {
245 use std::os::unix::fs::PermissionsExt;
246 permissions.set_mode(0o400);
247 }
248 file.set_permissions(permissions).await?;
249
250 blocking::unblock(move || {
251 drop(file);
252 let map = IpcMemMap::read(name, None)?;
253 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
254 })
255 .await
256 }
257
258 #[cfg(ipc)]
271 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
272 blocking::unblock(move || {
273 unsafe { Self::open_memmap_blocking(file, range) }
275 })
276 .await
277 }
278
279 pub fn ptr_eq(&self, other: &Self) -> bool {
281 let a = &self[..];
282 let b = &other[..];
283 (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
284 }
285
286 #[cfg(ipc)]
287 const INLINE_MAX: usize = 64 * 1024; #[cfg(ipc)]
289 const UNNAMED_MAX: usize = 128 * 1024 * 1024; }
291
292impl IpcBytes {
294 pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
296 IpcBytesWriterBlocking {
297 #[cfg(ipc)]
298 heap_buf: vec![],
299 #[cfg(ipc)]
300 memmap: None,
301
302 #[cfg(not(ipc))]
303 heap_buf: std::io::Cursor::new(vec![]),
304 }
305 }
306
307 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
309 #[cfg(ipc)]
310 if len <= Self::INLINE_MAX {
311 Ok(IpcBytesMut {
312 len,
313 inner: IpcBytesMutInner::Heap(vec![0; len]),
314 })
315 } else if len <= Self::UNNAMED_MAX {
316 Ok(IpcBytesMut {
317 len,
318 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
319 })
320 } else {
321 let (name, file) = Self::create_memmap()?;
322 file.lock()?;
323 #[cfg(unix)]
324 {
325 let mut permissions = file.metadata()?.permissions();
326 use std::os::unix::fs::PermissionsExt;
327 permissions.set_mode(0o600);
328 file.set_permissions(permissions)?;
329 }
330 file.set_len(len as u64)?;
331 let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
333 Ok(IpcBytesMut {
334 len,
335 inner: IpcBytesMutInner::MemMap {
336 name,
337 map,
338 write_handle: file,
339 },
340 })
341 }
342 #[cfg(not(ipc))]
343 {
344 Ok(IpcBytesMut {
345 len,
346 inner: IpcBytesMutInner::Heap(vec![0; len]),
347 })
348 }
349 }
350
351 pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
353 #[cfg(ipc)]
354 {
355 if data.len() <= Self::INLINE_MAX {
356 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
357 } else if data.len() <= Self::UNNAMED_MAX {
358 Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
359 } else {
360 Self::new_memmap_blocking(|m| m.write_all(data))
361 }
362 }
363 #[cfg(not(ipc))]
364 {
365 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
366 }
367 }
368
369 pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
371 #[cfg(ipc)]
372 {
373 if data.len() <= Self::INLINE_MAX {
374 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
375 } else {
376 Self::from_slice_blocking(&data)
377 }
378 }
379 #[cfg(not(ipc))]
380 {
381 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
382 }
383 }
384
385 pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
387 #[cfg(ipc)]
388 {
389 Self::from_read_blocking_ipc(data)
390 }
391 #[cfg(not(ipc))]
392 {
393 let mut buf = vec![];
394 data.read_to_end(&mut buf)?;
395 Self::from_vec_blocking(buf)
396 }
397 }
398 #[cfg(ipc)]
399 fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
400 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
401 let mut len = 0;
402
403 loop {
405 match data.read(&mut buf[len..]) {
406 Ok(l) => {
407 if l == 0 {
408 buf.truncate(len);
410 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
411 } else {
412 len += l;
413 if len == Self::INLINE_MAX + 1 {
414 break;
416 }
417 }
418 }
419 Err(e) => match e.kind() {
420 io::ErrorKind::WouldBlock => continue,
421 _ => return Err(e),
422 },
423 }
424 }
425
426 buf.resize(Self::UNNAMED_MAX + 1, 0);
428 loop {
429 match data.read(&mut buf[len..]) {
430 Ok(l) => {
431 if l == 0 {
432 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
434 } else {
435 len += l;
436 if len == Self::UNNAMED_MAX + 1 {
437 break;
439 }
440 }
441 }
442 Err(e) => match e.kind() {
443 io::ErrorKind::WouldBlock => continue,
444 _ => return Err(e),
445 },
446 }
447 }
448
449 Self::new_memmap_blocking(|m| {
451 m.write_all(&buf)?;
452 io::copy(data, m)?;
453 Ok(())
454 })
455 }
456
457 pub fn from_file_blocking(file: &Path) -> io::Result<Self> {
459 #[cfg(ipc)]
460 {
461 let mut file = fs::File::open(file)?;
462 let len = file.metadata()?.len();
463 if len <= Self::UNNAMED_MAX as u64 {
464 let mut buf = vec![0u8; len as usize];
465 file.read_exact(&mut buf)?;
466 Self::from_vec_blocking(buf)
467 } else {
468 Self::new_memmap_blocking(|m| {
469 io::copy(&mut file, m)?;
470 Ok(())
471 })
472 }
473 }
474 #[cfg(not(ipc))]
475 {
476 let mut file = fs::File::open(file)?;
477 let mut buf = vec![];
478 file.read_to_end(&mut buf)?;
479 Self::from_vec_blocking(buf)
480 }
481 }
482
483 #[cfg(ipc)]
488 pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
489 let (name, mut file) = Self::create_memmap()?;
490 write(&mut file)?;
491 let mut permissions = file.metadata()?.permissions();
492 permissions.set_readonly(true);
493 #[cfg(unix)]
494 {
495 use std::os::unix::fs::PermissionsExt;
496 permissions.set_mode(0o400);
497 }
498 file.set_permissions(permissions)?;
499
500 drop(file);
501 let map = IpcMemMap::read(name, None)?;
502 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
503 }
504 #[cfg(ipc)]
505 fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
506 static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
507 let mut count = MEMMAP_DIR.lock();
508
509 if *count == 0 {
510 zng_env::on_process_exit(|_| {
511 IpcBytes::cleanup_memmap_storage();
512 });
513 }
514
515 let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
516 fs::create_dir_all(&dir)?;
517 let mut name = dir.join(count.to_string());
518 if *count < usize::MAX {
519 *count += 1;
520 } else {
521 for i in 0..usize::MAX {
523 name = dir.join(i.to_string());
524 if !name.exists() {
525 break;
526 }
527 }
528 if name.exists() {
529 return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
530 }
531 };
532
533 let file = fs::OpenOptions::new()
535 .create(true)
536 .read(true)
537 .write(true)
538 .truncate(true)
539 .open(&name)?;
540 Ok((name, file))
541 }
542 #[cfg(ipc)]
543 fn cleanup_memmap_storage() {
544 if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
545 let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
546 for entry in entries {
547 if entry.is_dir() {
548 fs::remove_dir_all(entry).ok();
549 }
550 }
551 }
552 }
553
554 #[cfg(ipc)]
567 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
568 let read_handle = fs::File::open(&file)?;
569 read_handle.lock_shared()?;
570 let len = read_handle.metadata()?.len();
571 if let Some(range) = &range
572 && len < range.end as u64
573 {
574 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
575 }
576 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
578
579 let range = range.unwrap_or_else(|| 0..map.len());
580
581 Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
582 name: file,
583 range,
584 read_handle: Some(read_handle),
585 is_custom: true,
586 map: Some(map),
587 }))))
588 }
589}
590
591impl AsRef<[u8]> for IpcBytes {
592 fn as_ref(&self) -> &[u8] {
593 &self[..]
594 }
595}
596impl Default for IpcBytes {
597 fn default() -> Self {
598 Self::empty()
599 }
600}
601impl PartialEq for IpcBytes {
602 fn eq(&self, other: &Self) -> bool {
603 self.ptr_eq(other) || self[..] == other[..]
604 }
605}
606#[cfg(ipc)]
607impl IpcMemMap {
608 fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
609 let read_handle = fs::File::open(&name)?;
610 read_handle.lock_shared()?;
611 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
613
614 let range = range.unwrap_or_else(|| 0..map.len());
615
616 Ok(IpcMemMap {
617 name,
618 range,
619 is_custom: false,
620 read_handle: Some(read_handle),
621 map: Some(map),
622 })
623 }
624}
625#[cfg(ipc)]
626impl Serialize for IpcMemMap {
627 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
628 where
629 S: serde::Serializer,
630 {
631 (&self.name, self.range.clone()).serialize(serializer)
632 }
633}
634#[cfg(ipc)]
635impl<'de> Deserialize<'de> for IpcMemMap {
636 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
637 where
638 D: serde::Deserializer<'de>,
639 {
640 let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
641 IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
642 }
643}
644#[cfg(ipc)]
645impl Drop for IpcMemMap {
646 fn drop(&mut self) {
647 self.map.take();
648 self.read_handle.take();
649 if !self.is_custom {
650 std::fs::remove_file(&self.name).ok();
651 }
652 }
653}
654
655impl Serialize for IpcBytes {
656 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
657 where
658 S: serde::Serializer,
659 {
660 #[cfg(ipc)]
661 {
662 if is_ipc_serialization() {
663 match &*self.0 {
664 IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
665 IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
666 IpcBytesData::MemMap(b) => {
667 let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
670 .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
671
672 let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
673 let hold = self.clone();
674 crate::spawn_wait(move || {
675 if let Err(e) = recv.recv_blocking() {
676 tracing::error!("IpcBytes memmap completion signal not received, {e}")
677 }
678 drop(hold);
679 });
680 Ok(r)
681 }
682 }
683 } else {
684 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
685 }
686 }
687 #[cfg(not(ipc))]
688 {
689 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
690 }
691 }
692}
693impl<'de> Deserialize<'de> for IpcBytes {
694 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
695 where
696 D: serde::Deserializer<'de>,
697 {
698 #[derive(Deserialize)]
699 enum VariantId {
700 Heap,
701 #[cfg(ipc)]
702 AnonMemMap,
703 #[cfg(ipc)]
704 MemMap,
705 }
706
707 struct EnumVisitor;
708 impl<'de> serde::de::Visitor<'de> for EnumVisitor {
709 type Value = IpcBytes;
710
711 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 write!(f, "IpcBytes variant")
713 }
714
715 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
716 where
717 A: serde::de::EnumAccess<'de>,
718 {
719 let (variant, access) = data.variant::<VariantId>()?;
720 match variant {
721 VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
722 #[cfg(ipc)]
723 VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
724 #[cfg(ipc)]
725 VariantId::MemMap => {
726 let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
727 completion_sender.send_blocking(()).map_err(|e| {
728 serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
729 })?;
730 Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
731 }
732 }
733 }
734 }
735 struct ByteSliceVisitor;
736 impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
737 type Value = IpcBytes;
738
739 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
740 write!(f, "byte buffer")
741 }
742
743 fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
744 where
745 E: serde::de::Error,
746 {
747 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
748 }
749
750 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
751 where
752 E: serde::de::Error,
753 {
754 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
755 }
756
757 fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
758 where
759 E: serde::de::Error,
760 {
761 IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
762 }
763 }
764 impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
765 type Value = IpcBytes;
766
767 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
768 where
769 D: serde::Deserializer<'de>,
770 {
771 deserializer.deserialize_bytes(ByteSliceVisitor)
772 }
773 }
774
775 #[cfg(ipc)]
776 {
777 deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
778 }
779 #[cfg(not(ipc))]
780 {
781 deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
782 }
783 }
784}
785
786#[cfg(ipc)]
794pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
795 let parent = IPC_SERIALIZATION.replace(true);
796 let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
797 serialize()
798}
799
800#[cfg(ipc)]
802pub fn is_ipc_serialization() -> bool {
803 IPC_SERIALIZATION.get()
804}
805
806#[cfg(ipc)]
807thread_local! {
808 static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
809}
810
811impl IpcBytes {
812 pub fn downgrade(&self) -> WeakIpcBytes {
816 WeakIpcBytes(Arc::downgrade(&self.0))
817 }
818}
819
820pub struct WeakIpcBytes(Weak<IpcBytesData>);
822impl WeakIpcBytes {
823 pub fn upgrade(&self) -> Option<IpcBytes> {
825 self.0.upgrade().map(IpcBytes)
826 }
827
828 pub fn strong_count(&self) -> usize {
830 self.0.strong_count()
831 }
832}
833
834pub struct IpcBytesWriter {
838 inner: blocking::Unblock<IpcBytesWriterBlocking>,
839}
840impl IpcBytesWriter {
841 pub async fn finish(self) -> std::io::Result<IpcBytes> {
843 let inner = self.inner.into_inner().await;
844 blocking::unblock(move || inner.finish()).await
845 }
846
847 pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
849 let inner = self.inner.into_inner().await;
850 blocking::unblock(move || inner.finish_mut()).await
851 }
852}
853impl crate::io::AsyncWrite for IpcBytesWriter {
854 fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
855 crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
856 }
857
858 fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
859 crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
860 }
861
862 fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
863 crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
864 }
865}
866impl crate::io::AsyncSeek for IpcBytesWriter {
867 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
868 crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
869 }
870}
871
872pub struct IpcBytesWriterBlocking {
876 #[cfg(ipc)]
877 heap_buf: Vec<u8>,
878 #[cfg(ipc)]
879 memmap: Option<(PathBuf, std::fs::File)>,
880
881 #[cfg(not(ipc))]
882 heap_buf: std::io::Cursor<Vec<u8>>,
883}
884impl IpcBytesWriterBlocking {
885 pub fn finish(self) -> std::io::Result<IpcBytes> {
887 let m = self.finish_mut()?;
888 m.finish_blocking()
889 }
890
891 pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
893 self.flush()?;
894 #[cfg(ipc)]
895 {
896 let (len, inner) = match self.memmap {
897 Some((name, write_handle)) => {
898 let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
900 let len = map.len();
901 (len, IpcBytesMutInner::MemMap { name, map, write_handle })
902 }
903 None => {
904 let len = self.heap_buf.len();
905 let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
906 IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
907 } else {
908 IpcBytesMutInner::Heap(self.heap_buf)
909 };
910 (len, i)
911 }
912 };
913 Ok(IpcBytesMut { len, inner })
914 }
915 #[cfg(not(ipc))]
916 {
917 let heap_buf = self.heap_buf.into_inner();
918 let len = heap_buf.len();
919 let inner = IpcBytesMutInner::Heap(heap_buf);
920 Ok(IpcBytesMut { len, inner })
921 }
922 }
923
924 #[cfg(ipc)]
925 fn alloc_memmap_file(&mut self) -> io::Result<()> {
926 if self.memmap.is_none() {
927 let (name, file) = IpcBytes::create_memmap()?;
928 file.lock()?;
929 #[cfg(unix)]
930 {
931 let mut permissions = file.metadata()?.permissions();
932 use std::os::unix::fs::PermissionsExt;
933 permissions.set_mode(0o600);
934 file.set_permissions(permissions)?;
935 }
936 self.memmap = Some((name, file));
937 }
938 let file = &mut self.memmap.as_mut().unwrap().1;
939
940 file.write_all(&self.heap_buf)?;
941 self.heap_buf.clear();
943 Ok(())
944 }
945}
946impl std::io::Write for IpcBytesWriterBlocking {
947 fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
948 #[cfg(ipc)]
949 {
950 if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
951 self.alloc_memmap_file()?;
953
954 if write_buf.len() > IpcBytes::UNNAMED_MAX {
955 self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
957 } else {
958 self.heap_buf.extend_from_slice(write_buf);
959 }
960 } else {
961 if self.memmap.is_none() {
962 self.heap_buf
964 .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
965 }
966 self.heap_buf.extend_from_slice(write_buf);
967 }
968
969 Ok(write_buf.len())
970 }
971
972 #[cfg(not(ipc))]
973 {
974 std::io::Write::write(&mut self.heap_buf, write_buf)
975 }
976 }
977
978 fn flush(&mut self) -> io::Result<()> {
979 #[cfg(ipc)]
980 if let Some((_, file)) = &mut self.memmap {
981 if !self.heap_buf.is_empty() {
982 file.write_all(&self.heap_buf)?;
983 self.heap_buf.clear();
984 }
985 file.flush()?;
986 }
987 Ok(())
988 }
989}
990impl std::io::Seek for IpcBytesWriterBlocking {
991 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
992 #[cfg(ipc)]
993 {
994 self.alloc_memmap_file()?;
995 let (_, file) = self.memmap.as_mut().unwrap();
996 if !self.heap_buf.is_empty() {
997 file.write_all(&self.heap_buf)?;
998 self.heap_buf.clear();
999 }
1000 file.seek(pos)
1001 }
1002 #[cfg(not(ipc))]
1003 {
1004 std::io::Seek::seek(&mut self.heap_buf, pos)
1005 }
1006 }
1007}
1008
1009enum IpcBytesMutInner {
1010 Heap(Vec<u8>),
1011 #[cfg(ipc)]
1012 AnonMemMap(IpcSharedMemory),
1013 #[cfg(ipc)]
1014 MemMap {
1015 name: PathBuf,
1016 map: memmap2::MmapMut,
1017 write_handle: std::fs::File,
1018 },
1019}
1020
1021pub struct IpcBytesMut {
1025 inner: IpcBytesMutInner,
1026 len: usize,
1027}
1028impl ops::Deref for IpcBytesMut {
1029 type Target = [u8];
1030
1031 fn deref(&self) -> &Self::Target {
1032 let len = self.len;
1033 match &self.inner {
1034 IpcBytesMutInner::Heap(v) => &v[..len],
1035 #[cfg(ipc)]
1036 IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1037 #[cfg(ipc)]
1038 IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1039 }
1040 }
1041}
1042impl ops::DerefMut for IpcBytesMut {
1043 fn deref_mut(&mut self) -> &mut Self::Target {
1044 let len = self.len;
1045 match &mut self.inner {
1046 IpcBytesMutInner::Heap(v) => &mut v[..len],
1047 #[cfg(ipc)]
1048 IpcBytesMutInner::AnonMemMap(m) => {
1049 unsafe { m.deref_mut() }
1051 }
1052 #[cfg(ipc)]
1053 IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1054 }
1055 }
1056}
1057impl fmt::Debug for IpcBytesMut {
1058 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1059 write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1060 }
1061}
1062impl IpcBytesMut {
1063 pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1065 #[cfg(ipc)]
1066 if buf.len() <= IpcBytes::INLINE_MAX {
1067 Ok(Self {
1068 len: buf.len(),
1069 inner: IpcBytesMutInner::Heap(buf),
1070 })
1071 } else {
1072 blocking::unblock(move || {
1073 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1074 b[..].copy_from_slice(&buf);
1075 Ok(b)
1076 })
1077 .await
1078 }
1079 #[cfg(not(ipc))]
1080 {
1081 Ok(Self {
1082 len: buf.len(),
1083 inner: IpcBytesMutInner::Heap(buf),
1084 })
1085 }
1086 }
1087
1088 pub async fn finish(mut self) -> io::Result<IpcBytes> {
1090 let len = self.len;
1091 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1092 IpcBytesMutInner::Heap(mut v) => {
1093 v.truncate(len);
1094 v.shrink_to_fit();
1095 IpcBytesData::Heap(v)
1096 }
1097 #[cfg(ipc)]
1098 IpcBytesMutInner::AnonMemMap(m) => {
1099 if len < IpcBytes::INLINE_MAX {
1100 IpcBytesData::Heap(m[..len].to_vec())
1101 } else if len < m.len() {
1102 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1103 } else {
1104 IpcBytesData::AnonMemMap(m)
1105 }
1106 }
1107 #[cfg(ipc)]
1108 IpcBytesMutInner::MemMap { name, map, write_handle } => {
1109 let len = self.len;
1110 blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1111 }
1112 };
1113 Ok(IpcBytes(Arc::new(data)))
1114 }
1115
1116 #[cfg(ipc)]
1117 fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1118 let alloc_len = map.len();
1119 if alloc_len != len {
1120 write_handle.set_len(len as u64)?;
1121 }
1122 write_handle.unlock()?;
1123 let map = if alloc_len != len {
1124 drop(map);
1125 unsafe { memmap2::Mmap::map(&write_handle) }?
1127 } else {
1128 map.make_read_only()?
1129 };
1130 let mut permissions = write_handle.metadata()?.permissions();
1131 permissions.set_readonly(true);
1132 #[cfg(unix)]
1133 {
1134 use std::os::unix::fs::PermissionsExt;
1135 permissions.set_mode(0o400);
1136 }
1137 write_handle.set_permissions(permissions)?;
1138 drop(write_handle);
1139 let read_handle = std::fs::File::open(&name)?;
1140 read_handle.lock_shared()?;
1141 Ok(IpcBytesData::MemMap(IpcMemMap {
1142 name,
1143 range: 0..len,
1144 is_custom: false,
1145 map: Some(map),
1146 read_handle: Some(read_handle),
1147 }))
1148 }
1149}
1150impl IpcBytesMut {
1151 pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1153 #[cfg(ipc)]
1154 if buf.len() <= IpcBytes::INLINE_MAX {
1155 Ok(Self {
1156 len: buf.len(),
1157 inner: IpcBytesMutInner::Heap(buf),
1158 })
1159 } else {
1160 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1161 b[..].copy_from_slice(&buf);
1162 Ok(b)
1163 }
1164 #[cfg(not(ipc))]
1165 {
1166 Ok(Self {
1167 len: buf.len(),
1168 inner: IpcBytesMutInner::Heap(buf),
1169 })
1170 }
1171 }
1172
1173 pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1175 let len = self.len;
1176 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1177 IpcBytesMutInner::Heap(mut v) => {
1178 v.truncate(len);
1179 IpcBytesData::Heap(v)
1180 }
1181 #[cfg(ipc)]
1182 IpcBytesMutInner::AnonMemMap(m) => {
1183 if len < IpcBytes::INLINE_MAX {
1184 IpcBytesData::Heap(m[..len].to_vec())
1185 } else if len < m.len() {
1186 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1187 } else {
1188 IpcBytesData::AnonMemMap(m)
1189 }
1190 }
1191 #[cfg(ipc)]
1192 IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1193 };
1194 Ok(IpcBytes(Arc::new(data)))
1195 }
1196}
1197#[cfg(ipc)]
1198impl Drop for IpcBytesMut {
1199 fn drop(&mut self) {
1200 if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1201 drop(map);
1202 drop(write_handle);
1203 std::fs::remove_file(name).ok();
1204 }
1205 }
1206}
1207
1208pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1212 bytes: IpcBytesMut,
1213 _t: PhantomData<T>,
1214}
1215impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1216 type Target = [T];
1217
1218 fn deref(&self) -> &Self::Target {
1219 bytemuck::cast_slice::<u8, T>(&self.bytes)
1220 }
1221}
1222impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1223 fn deref_mut(&mut self) -> &mut Self::Target {
1224 bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1225 }
1226}
1227impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1228 pub fn into_inner(self) -> IpcBytesMut {
1230 self.bytes
1231 }
1232}
1233impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1234 fn from(value: IpcBytesMutCast<T>) -> Self {
1235 value.bytes
1236 }
1237}
1238impl IpcBytesMut {
1239 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1249 let r = IpcBytesMutCast {
1250 bytes: self,
1251 _t: PhantomData,
1252 };
1253 let _assert = &r[..];
1254 r
1255 }
1256
1257 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1265 bytemuck::cast_slice(self)
1266 }
1267
1268 pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1276 bytemuck::cast_slice_mut(self)
1277 }
1278}
1279
1280pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1284 bytes: IpcBytes,
1285 _t: PhantomData<T>,
1286}
1287impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1288 type Target = [T];
1289
1290 fn deref(&self) -> &Self::Target {
1291 bytemuck::cast_slice::<u8, T>(&self.bytes)
1292 }
1293}
1294impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1295 pub fn into_inner(self) -> IpcBytes {
1297 self.bytes
1298 }
1299}
1300impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1301 fn from(value: IpcBytesCast<T>) -> Self {
1302 value.bytes
1303 }
1304}
1305impl IpcBytes {
1306 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1316 let r = IpcBytesCast {
1317 bytes: self,
1318 _t: PhantomData,
1319 };
1320 let _assert = &r[..];
1321 r
1322 }
1323
1324 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1332 bytemuck::cast_slice(self)
1333 }
1334}
1335
1336impl IpcBytesMut {
1337 pub fn truncate(&mut self, new_len: usize) {
1343 self.len = self.len.min(new_len);
1344 }
1345
1346 pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1356 assert!(L1 <= L0);
1357
1358 let self_ = &mut self[..];
1359
1360 let len = self_.len();
1361 if len == 0 {
1362 return;
1363 }
1364 assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1365
1366 let ptr = self_.as_mut_ptr();
1367 let mut write = 0usize;
1368 let mut read = 0usize;
1369
1370 unsafe {
1372 while read < len {
1373 let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1374 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1375 read += L0;
1376
1377 let out_chunk = reduce(in_chunk.assume_init());
1378
1379 std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1380 write += L1;
1381 }
1382 }
1383
1384 self.truncate(write);
1385 }
1386
1387 pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1397 assert!(out_chunk_buf.len() < in_chunk_len);
1398
1399 let self_ = &mut self[..];
1400
1401 let len = self_.len();
1402 if len == 0 {
1403 return;
1404 }
1405 assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1406
1407 let ptr = self_.as_mut_ptr();
1408 let mut write = 0usize;
1409 let mut read = 0usize;
1410
1411 unsafe {
1413 while read < len {
1414 reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1415 read += in_chunk_len;
1416
1417 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1418 write += out_chunk_buf.len();
1419 }
1420 }
1421
1422 self.truncate(write);
1423 }
1424
1425 pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1435 where
1436 T0: bytemuck::AnyBitPattern,
1437 {
1438 let l0 = std::mem::size_of::<T0>() * L0;
1439 let l1 = std::mem::size_of::<T1>() * L1;
1440 assert!(l1 <= l0);
1441
1442 let self_ = &mut self[..];
1443
1444 let len = self_.len();
1445 if len == 0 {
1446 return;
1447 }
1448 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1449
1450 let ptr = self_.as_mut_ptr();
1451 let mut write = 0usize;
1452 let mut read = 0usize;
1453
1454 unsafe {
1458 while read < len {
1459 let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1460 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1461 read += l0;
1462
1463 let out_chunk = reduce(in_chunk.assume_init());
1464
1465 std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1466 write += l1;
1467 }
1468 }
1469
1470 self.truncate(write);
1471 }
1472
1473 pub fn cast_reduce_in_place_dyn<T0, T1>(
1485 &mut self,
1486 in_chunk_len: usize,
1487 out_chunk_buf: &mut [T1],
1488 mut reduce: impl FnMut(&[T0], &mut [T1]),
1489 ) where
1490 T0: bytemuck::AnyBitPattern,
1491 {
1492 let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1493 let l1 = std::mem::size_of_val(out_chunk_buf);
1494
1495 assert!(l1 <= l0);
1496
1497 let self_ = &mut self[..];
1498
1499 let len = self_.len();
1500 if len == 0 {
1501 return;
1502 }
1503 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1504
1505 let ptr = self_.as_mut_ptr();
1506 let mut write = 0usize;
1507 let mut read = 0usize;
1508
1509 unsafe {
1511 while read < len {
1512 reduce(
1513 bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1514 &mut *out_chunk_buf,
1515 );
1516 read += l0;
1517
1518 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1519 write += l1;
1520 }
1521 }
1522
1523 self.truncate(write);
1524 }
1525
1526 pub fn reverse_chunks<const L: usize>(&mut self) {
1534 let self_ = &mut self[..];
1535
1536 let len = self_.len();
1537
1538 if len == 0 || L == 0 {
1539 return;
1540 }
1541
1542 if L == 1 {
1543 return self_.reverse();
1544 }
1545
1546 assert!(len.is_multiple_of(L), "length must be multiple of L");
1547
1548 unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
1550 }
1551
1552 pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
1558 let self_ = &mut self[..];
1559
1560 let len = self_.len();
1561
1562 if len == 0 || chunk_len == 0 {
1563 return;
1564 }
1565
1566 if chunk_len == 1 {
1567 return self_.reverse();
1568 }
1569
1570 assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
1571
1572 let mut a = 0;
1573 let mut b = len - chunk_len;
1574
1575 let ptr = self_.as_mut_ptr();
1576
1577 unsafe {
1579 while a < b {
1580 std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
1581 a += chunk_len;
1582 b -= chunk_len;
1583 }
1584 }
1585 }
1586}