1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 cell::Cell,
5 fmt, fs,
6 io::{self, Read, Write},
7 ops,
8 path::{Path, PathBuf},
9 pin::Pin,
10 sync::{Arc, Weak},
11};
12
13use futures_lite::AsyncReadExt;
14#[cfg(ipc)]
15use ipc_channel::ipc::IpcSharedMemory;
16use parking_lot::Mutex;
17use serde::{Deserialize, Serialize, de::VariantAccess};
18use zng_app_context::RunOnDrop;
19
20#[derive(Clone)]
44#[repr(transparent)]
45pub struct IpcBytes(Arc<IpcBytesData>);
46enum IpcBytesData {
47 Heap(Vec<u8>),
48 #[cfg(ipc)]
49 AnonMemMap(IpcSharedMemory),
50 #[cfg(ipc)]
51 MemMap(IpcMemMap),
52}
53#[cfg(ipc)]
54struct IpcMemMap {
55 name: PathBuf,
56 range: ops::Range<usize>,
57 is_custom: bool,
58 map: Option<memmap2::Mmap>,
59 read_handle: Option<fs::File>,
60}
61impl fmt::Debug for IpcBytes {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 write!(f, "IpcBytes(<{} bytes>)", self.len())
64 }
65}
66impl ops::Deref for IpcBytes {
67 type Target = [u8];
68
69 fn deref(&self) -> &Self::Target {
70 match &*self.0 {
71 IpcBytesData::Heap(i) => i,
72 #[cfg(ipc)]
73 IpcBytesData::AnonMemMap(m) => m,
74 #[cfg(ipc)]
75 IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
76 }
77 }
78}
79
80impl IpcBytes {
81 pub fn empty() -> Self {
83 IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
84 }
85}
86impl IpcBytes {
88 pub async fn new_writer() -> IpcBytesWriter {
90 IpcBytesWriter {
91 inner: blocking::Unblock::new(Self::new_writer_blocking()),
92 }
93 }
94
95 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
97 #[cfg(ipc)]
98 if len <= Self::INLINE_MAX {
99 Ok(IpcBytesMut {
100 inner: IpcBytesMutInner::Heap(vec![0; len]),
101 })
102 } else if len <= Self::UNNAMED_MAX {
103 Ok(IpcBytesMut {
104 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
105 })
106 } else {
107 blocking::unblock(move || Self::new_mut_blocking(len)).await
108 }
109
110 #[cfg(not(ipc))]
111 {
112 Ok(IpcBytesMut {
113 inner: IpcBytesMutInner::Heap(vec![0; len]),
114 })
115 }
116 }
117
118 pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
120 blocking::unblock(move || Self::from_vec_blocking(data)).await
121 }
122
123 pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
125 #[cfg(ipc)]
126 {
127 Self::from_read_ipc(data).await
128 }
129 #[cfg(not(ipc))]
130 {
131 let mut data = data;
132 let mut buf = vec![];
133 data.read_to_end(&mut buf).await;
134 Self::from_vec(buf).await
135 }
136 }
137 #[cfg(ipc)]
138 async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
139 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
140 let mut len = 0;
141
142 loop {
144 match data.read(&mut buf[len..]).await {
145 Ok(l) => {
146 if l == 0 {
147 buf.truncate(len);
149 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
150 } else {
151 len += l;
152 if len == Self::INLINE_MAX + 1 {
153 break;
155 }
156 }
157 }
158 Err(e) => match e.kind() {
159 io::ErrorKind::WouldBlock => continue,
160 _ => return Err(e),
161 },
162 }
163 }
164
165 buf.resize(Self::UNNAMED_MAX + 1, 0);
167 loop {
168 match data.read(&mut buf[len..]).await {
169 Ok(l) => {
170 if l == 0 {
171 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
173 } else {
174 len += l;
175 if len == Self::UNNAMED_MAX + 1 {
176 break;
178 }
179 }
180 }
181 Err(e) => match e.kind() {
182 io::ErrorKind::WouldBlock => continue,
183 _ => return Err(e),
184 },
185 }
186 }
187
188 Self::new_memmap(async |m| {
190 use futures_lite::AsyncWriteExt as _;
191
192 m.write_all(&buf).await?;
193 crate::io::copy(data, m).await?;
194 Ok(())
195 })
196 .await
197 }
198
199 pub async fn from_file(file: PathBuf) -> io::Result<Self> {
201 #[cfg(ipc)]
202 {
203 let mut file = crate::fs::File::open(file).await?;
204 let len = file.metadata().await?.len();
205 if len <= Self::UNNAMED_MAX as u64 {
206 let mut buf = vec![0u8; len as usize];
207 file.read_exact(&mut buf).await?;
208 Self::from_vec_blocking(buf)
209 } else {
210 Self::new_memmap(async move |m| {
211 crate::io::copy(&mut file, m).await?;
212 Ok(())
213 })
214 .await
215 }
216 }
217 #[cfg(not(ipc))]
218 {
219 let mut file = crate::fs::File::open(file).await?;
220 let mut buf = vec![];
221 file.read_to_end(&mut buf).await?;
222 Self::from_vec_blocking(buf)
223 }
224 }
225
226 #[cfg(ipc)]
231 pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
232 let (name, file) = blocking::unblock(Self::create_memmap).await?;
233 let mut file = crate::fs::File::from(file);
234 write(&mut file).await?;
235
236 let mut permissions = file.metadata().await?.permissions();
237 permissions.set_readonly(true);
238 #[cfg(unix)]
239 {
240 use std::os::unix::fs::PermissionsExt;
241 permissions.set_mode(0o400);
242 }
243 file.set_permissions(permissions).await?;
244
245 blocking::unblock(move || {
246 drop(file);
247 let map = IpcMemMap::read(name, None)?;
248 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
249 })
250 .await
251 }
252
253 #[cfg(ipc)]
266 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
267 blocking::unblock(move || {
268 unsafe { Self::open_memmap_blocking(file, range) }
270 })
271 .await
272 }
273
274 pub fn ptr_eq(&self, other: &Self) -> bool {
276 let a = &self[..];
277 let b = &other[..];
278 (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
279 }
280
281 #[cfg(ipc)]
282 const INLINE_MAX: usize = 64 * 1024; #[cfg(ipc)]
284 const UNNAMED_MAX: usize = 128 * 1024 * 1024; }
286
287impl IpcBytes {
289 pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
291 IpcBytesWriterBlocking {
292 #[cfg(ipc)]
293 heap_buf: vec![],
294 #[cfg(ipc)]
295 memmap: None,
296
297 #[cfg(not(ipc))]
298 heap_buf: std::io::Cursor::new(vec![]),
299 }
300 }
301
302 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
304 #[cfg(ipc)]
305 if len <= Self::INLINE_MAX {
306 Ok(IpcBytesMut {
307 inner: IpcBytesMutInner::Heap(vec![0; len]),
308 })
309 } else if len <= Self::UNNAMED_MAX {
310 Ok(IpcBytesMut {
311 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
312 })
313 } else {
314 let (name, file) = Self::create_memmap()?;
315 file.lock()?;
316 #[cfg(unix)]
317 {
318 let mut permissions = file.metadata()?.permissions();
319 use std::os::unix::fs::PermissionsExt;
320 permissions.set_mode(0o600);
321 file.set_permissions(permissions)?;
322 }
323 file.set_len(len as u64)?;
324 let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
326 Ok(IpcBytesMut {
327 inner: IpcBytesMutInner::MemMap {
328 name,
329 map,
330 write_handle: file,
331 },
332 })
333 }
334 #[cfg(not(ipc))]
335 {
336 Ok(IpcBytesMut {
337 inner: IpcBytesMutInner::Heap(vec![0; len]),
338 })
339 }
340 }
341
342 pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
344 #[cfg(ipc)]
345 {
346 if data.len() <= Self::INLINE_MAX {
347 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
348 } else if data.len() <= Self::UNNAMED_MAX {
349 Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
350 } else {
351 Self::new_memmap_blocking(|m| m.write_all(data))
352 }
353 }
354 #[cfg(not(ipc))]
355 {
356 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
357 }
358 }
359
360 pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
362 #[cfg(ipc)]
363 {
364 if data.len() <= Self::INLINE_MAX {
365 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
366 } else {
367 Self::from_slice_blocking(&data)
368 }
369 }
370 #[cfg(not(ipc))]
371 {
372 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
373 }
374 }
375
376 pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
378 #[cfg(ipc)]
379 {
380 Self::from_read_blocking_ipc(data)
381 }
382 #[cfg(not(ipc))]
383 {
384 let mut buf = vec![];
385 data.read_to_end(&mut buf)?;
386 Self::from_vec_blocking(buf)
387 }
388 }
389 #[cfg(ipc)]
390 fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
391 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
392 let mut len = 0;
393
394 loop {
396 match data.read(&mut buf[len..]) {
397 Ok(l) => {
398 if l == 0 {
399 buf.truncate(len);
401 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
402 } else {
403 len += l;
404 if len == Self::INLINE_MAX + 1 {
405 break;
407 }
408 }
409 }
410 Err(e) => match e.kind() {
411 io::ErrorKind::WouldBlock => continue,
412 _ => return Err(e),
413 },
414 }
415 }
416
417 buf.resize(Self::UNNAMED_MAX + 1, 0);
419 loop {
420 match data.read(&mut buf[len..]) {
421 Ok(l) => {
422 if l == 0 {
423 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
425 } else {
426 len += l;
427 if len == Self::UNNAMED_MAX + 1 {
428 break;
430 }
431 }
432 }
433 Err(e) => match e.kind() {
434 io::ErrorKind::WouldBlock => continue,
435 _ => return Err(e),
436 },
437 }
438 }
439
440 Self::new_memmap_blocking(|m| {
442 m.write_all(&buf)?;
443 io::copy(data, m)?;
444 Ok(())
445 })
446 }
447
448 pub fn from_file_blocking(file: &Path) -> io::Result<Self> {
450 #[cfg(ipc)]
451 {
452 let mut file = fs::File::open(file)?;
453 let len = file.metadata()?.len();
454 if len <= Self::UNNAMED_MAX as u64 {
455 let mut buf = vec![0u8; len as usize];
456 file.read_exact(&mut buf)?;
457 Self::from_vec_blocking(buf)
458 } else {
459 Self::new_memmap_blocking(|m| {
460 io::copy(&mut file, m)?;
461 Ok(())
462 })
463 }
464 }
465 #[cfg(not(ipc))]
466 {
467 let mut file = fs::File::open(file)?;
468 let mut buf = vec![];
469 file.read_to_end(&mut buf)?;
470 Self::from_vec_blocking(buf)
471 }
472 }
473
474 #[cfg(ipc)]
479 pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
480 let (name, mut file) = Self::create_memmap()?;
481 write(&mut file)?;
482 let mut permissions = file.metadata()?.permissions();
483 permissions.set_readonly(true);
484 #[cfg(unix)]
485 {
486 use std::os::unix::fs::PermissionsExt;
487 permissions.set_mode(0o400);
488 }
489 file.set_permissions(permissions)?;
490
491 drop(file);
492 let map = IpcMemMap::read(name, None)?;
493 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
494 }
495 #[cfg(ipc)]
496 fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
497 static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
498 let mut count = MEMMAP_DIR.lock();
499
500 if *count == 0 {
501 zng_env::on_process_exit(|_| {
502 IpcBytes::cleanup_memmap_storage();
503 });
504 }
505
506 let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
507 fs::create_dir_all(&dir)?;
508 let mut name = dir.join(count.to_string());
509 if *count < usize::MAX {
510 *count += 1;
511 } else {
512 for i in 0..usize::MAX {
514 name = dir.join(i.to_string());
515 if !name.exists() {
516 break;
517 }
518 }
519 if name.exists() {
520 return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
521 }
522 };
523
524 let file = fs::OpenOptions::new()
526 .create(true)
527 .read(true)
528 .write(true)
529 .truncate(true)
530 .open(&name)?;
531 Ok((name, file))
532 }
533 #[cfg(ipc)]
534 fn cleanup_memmap_storage() {
535 if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
536 let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
537 for entry in entries {
538 if entry.is_dir() {
539 fs::remove_dir_all(entry).ok();
540 }
541 }
542 }
543 }
544
545 #[cfg(ipc)]
558 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
559 let read_handle = fs::File::open(&file)?;
560 read_handle.lock_shared()?;
561 let len = read_handle.metadata()?.len();
562 if let Some(range) = &range
563 && len < range.end as u64
564 {
565 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
566 }
567 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
569
570 let range = range.unwrap_or_else(|| 0..map.len());
571
572 Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
573 name: file,
574 range,
575 read_handle: Some(read_handle),
576 is_custom: true,
577 map: Some(map),
578 }))))
579 }
580}
581
582impl AsRef<[u8]> for IpcBytes {
583 fn as_ref(&self) -> &[u8] {
584 &self[..]
585 }
586}
587impl Default for IpcBytes {
588 fn default() -> Self {
589 Self::empty()
590 }
591}
592impl PartialEq for IpcBytes {
593 fn eq(&self, other: &Self) -> bool {
594 self.ptr_eq(other) || self[..] == other[..]
595 }
596}
597#[cfg(ipc)]
598impl IpcMemMap {
599 fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
600 let read_handle = fs::File::open(&name)?;
601 read_handle.lock_shared()?;
602 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
604
605 let range = range.unwrap_or_else(|| 0..map.len());
606
607 Ok(IpcMemMap {
608 name,
609 range,
610 is_custom: false,
611 read_handle: Some(read_handle),
612 map: Some(map),
613 })
614 }
615}
616#[cfg(ipc)]
617impl Serialize for IpcMemMap {
618 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
619 where
620 S: serde::Serializer,
621 {
622 (&self.name, self.range.clone()).serialize(serializer)
623 }
624}
625#[cfg(ipc)]
626impl<'de> Deserialize<'de> for IpcMemMap {
627 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
628 where
629 D: serde::Deserializer<'de>,
630 {
631 let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
632 IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
633 }
634}
635#[cfg(ipc)]
636impl Drop for IpcMemMap {
637 fn drop(&mut self) {
638 self.map.take();
639 self.read_handle.take();
640 if !self.is_custom {
641 std::fs::remove_file(&self.name).ok();
642 }
643 }
644}
645
646impl Serialize for IpcBytes {
647 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
648 where
649 S: serde::Serializer,
650 {
651 #[cfg(ipc)]
652 {
653 if is_ipc_serialization() {
654 match &*self.0 {
655 IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
656 IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
657 IpcBytesData::MemMap(b) => {
658 let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
661 .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
662
663 let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
664 let hold = self.clone();
665 crate::spawn_wait(move || {
666 if let Err(e) = recv.recv_blocking() {
667 tracing::error!("IpcBytes memmap completion signal not received, {e}")
668 }
669 drop(hold);
670 });
671 Ok(r)
672 }
673 }
674 } else {
675 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
676 }
677 }
678 #[cfg(not(ipc))]
679 {
680 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
681 }
682 }
683}
684impl<'de> Deserialize<'de> for IpcBytes {
685 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
686 where
687 D: serde::Deserializer<'de>,
688 {
689 #[derive(Deserialize)]
690 enum VariantId {
691 Heap,
692 #[cfg(ipc)]
693 AnonMemMap,
694 #[cfg(ipc)]
695 MemMap,
696 }
697
698 struct EnumVisitor;
699 impl<'de> serde::de::Visitor<'de> for EnumVisitor {
700 type Value = IpcBytes;
701
702 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
703 write!(f, "IpcBytes variant")
704 }
705
706 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
707 where
708 A: serde::de::EnumAccess<'de>,
709 {
710 let (variant, access) = data.variant::<VariantId>()?;
711 match variant {
712 VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
713 #[cfg(ipc)]
714 VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
715 #[cfg(ipc)]
716 VariantId::MemMap => {
717 let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
718 completion_sender.send_blocking(()).map_err(|e| {
719 serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
720 })?;
721 Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
722 }
723 }
724 }
725 }
726 struct ByteSliceVisitor;
727 impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
728 type Value = IpcBytes;
729
730 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
731 write!(f, "byte buffer")
732 }
733
734 fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
735 where
736 E: serde::de::Error,
737 {
738 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
739 }
740
741 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
742 where
743 E: serde::de::Error,
744 {
745 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
746 }
747
748 fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
749 where
750 E: serde::de::Error,
751 {
752 IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
753 }
754 }
755 impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
756 type Value = IpcBytes;
757
758 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
759 where
760 D: serde::Deserializer<'de>,
761 {
762 deserializer.deserialize_bytes(ByteSliceVisitor)
763 }
764 }
765
766 #[cfg(ipc)]
767 {
768 deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
769 }
770 #[cfg(not(ipc))]
771 {
772 deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
773 }
774 }
775}
776
777#[cfg(ipc)]
785pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
786 let parent = IPC_SERIALIZATION.replace(true);
787 let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
788 serialize()
789}
790
791#[cfg(ipc)]
793pub fn is_ipc_serialization() -> bool {
794 IPC_SERIALIZATION.get()
795}
796
797#[cfg(ipc)]
798thread_local! {
799 static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
800}
801
802impl IpcBytes {
803 pub fn downgrade(&self) -> WeakIpcBytes {
807 WeakIpcBytes(Arc::downgrade(&self.0))
808 }
809}
810
811pub struct WeakIpcBytes(Weak<IpcBytesData>);
813impl WeakIpcBytes {
814 pub fn upgrade(&self) -> Option<IpcBytes> {
816 self.0.upgrade().map(IpcBytes)
817 }
818
819 pub fn strong_count(&self) -> usize {
821 self.0.strong_count()
822 }
823}
824
825pub struct IpcBytesWriter {
829 inner: blocking::Unblock<IpcBytesWriterBlocking>,
830}
831impl IpcBytesWriter {
832 pub async fn finish(self) -> std::io::Result<IpcBytes> {
834 let inner = self.inner.into_inner().await;
835 blocking::unblock(move || inner.finish()).await
836 }
837
838 pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
840 let inner = self.inner.into_inner().await;
841 blocking::unblock(move || inner.finish_mut()).await
842 }
843}
844impl crate::io::AsyncWrite for IpcBytesWriter {
845 fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
846 crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
847 }
848
849 fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
850 crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
851 }
852
853 fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
854 crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
855 }
856}
857impl crate::io::AsyncSeek for IpcBytesWriter {
858 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
859 crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
860 }
861}
862
863pub struct IpcBytesWriterBlocking {
867 #[cfg(ipc)]
868 heap_buf: Vec<u8>,
869 #[cfg(ipc)]
870 memmap: Option<(PathBuf, std::fs::File)>,
871
872 #[cfg(not(ipc))]
873 heap_buf: std::io::Cursor<Vec<u8>>,
874}
875impl IpcBytesWriterBlocking {
876 pub fn finish(self) -> std::io::Result<IpcBytes> {
878 let m = self.finish_mut()?;
879 m.finish_blocking()
880 }
881
882 pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
884 self.flush()?;
885 #[cfg(ipc)]
886 {
887 let inner = match self.memmap {
888 Some((name, write_handle)) => {
889 let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
891 IpcBytesMutInner::MemMap { name, map, write_handle }
892 }
893 None => {
894 if self.heap_buf.len() > IpcBytes::INLINE_MAX {
895 IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
896 } else {
897 IpcBytesMutInner::Heap(self.heap_buf)
898 }
899 }
900 };
901 Ok(IpcBytesMut { inner })
902 }
903 #[cfg(not(ipc))]
904 {
905 let inner = IpcBytesMutInner::Heap(self.heap_buf.into_inner());
906 Ok(IpcBytesMut { inner })
907 }
908 }
909
910 #[cfg(ipc)]
911 fn alloc_memmap_file(&mut self) -> io::Result<()> {
912 if self.memmap.is_none() {
913 let (name, file) = IpcBytes::create_memmap()?;
914 file.lock()?;
915 #[cfg(unix)]
916 {
917 let mut permissions = file.metadata()?.permissions();
918 use std::os::unix::fs::PermissionsExt;
919 permissions.set_mode(0o600);
920 file.set_permissions(permissions)?;
921 }
922 self.memmap = Some((name, file));
923 }
924 let file = &mut self.memmap.as_mut().unwrap().1;
925
926 file.write_all(&self.heap_buf)?;
927 self.heap_buf.clear();
929 Ok(())
930 }
931}
932impl std::io::Write for IpcBytesWriterBlocking {
933 fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
934 #[cfg(ipc)]
935 {
936 if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
937 self.alloc_memmap_file()?;
939
940 if write_buf.len() > IpcBytes::UNNAMED_MAX {
941 self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
943 } else {
944 self.heap_buf.extend_from_slice(write_buf);
945 }
946 } else {
947 if self.memmap.is_none() {
948 self.heap_buf
950 .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
951 }
952 self.heap_buf.extend_from_slice(write_buf);
953 }
954
955 Ok(write_buf.len())
956 }
957
958 #[cfg(not(ipc))]
959 {
960 std::io::Write::write(&mut self.heap_buf, write_buf)
961 }
962 }
963
964 fn flush(&mut self) -> io::Result<()> {
965 #[cfg(ipc)]
966 if let Some((_, file)) = &mut self.memmap {
967 if !self.heap_buf.is_empty() {
968 file.write_all(&self.heap_buf)?;
969 self.heap_buf.clear();
970 }
971 file.flush()?;
972 }
973 Ok(())
974 }
975}
976impl std::io::Seek for IpcBytesWriterBlocking {
977 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
978 #[cfg(ipc)]
979 {
980 self.alloc_memmap_file()?;
981 let (_, file) = self.memmap.as_mut().unwrap();
982 if !self.heap_buf.is_empty() {
983 file.write_all(&self.heap_buf)?;
984 self.heap_buf.clear();
985 }
986 file.seek(pos)
987 }
988 #[cfg(not(ipc))]
989 {
990 std::io::Seek::seek(&mut self.heap_buf, pos)
991 }
992 }
993}
994
995enum IpcBytesMutInner {
996 Heap(Vec<u8>),
997 #[cfg(ipc)]
998 AnonMemMap(IpcSharedMemory),
999 #[cfg(ipc)]
1000 MemMap {
1001 name: PathBuf,
1002 map: memmap2::MmapMut,
1003 write_handle: std::fs::File,
1004 },
1005}
1006
1007pub struct IpcBytesMut {
1011 inner: IpcBytesMutInner,
1012}
1013impl ops::Deref for IpcBytesMut {
1014 type Target = [u8];
1015
1016 fn deref(&self) -> &Self::Target {
1017 match &self.inner {
1018 IpcBytesMutInner::Heap(v) => &v[..],
1019 #[cfg(ipc)]
1020 IpcBytesMutInner::AnonMemMap(m) => &m[..],
1021 #[cfg(ipc)]
1022 IpcBytesMutInner::MemMap { map, .. } => &map[..],
1023 }
1024 }
1025}
1026impl ops::DerefMut for IpcBytesMut {
1027 fn deref_mut(&mut self) -> &mut Self::Target {
1028 match &mut self.inner {
1029 IpcBytesMutInner::Heap(v) => &mut v[..],
1030 #[cfg(ipc)]
1031 IpcBytesMutInner::AnonMemMap(m) => {
1032 unsafe { m.deref_mut() }
1034 }
1035 #[cfg(ipc)]
1036 IpcBytesMutInner::MemMap { map, .. } => &mut map[..],
1037 }
1038 }
1039}
1040impl fmt::Debug for IpcBytesMut {
1041 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1042 write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1043 }
1044}
1045impl IpcBytesMut {
1046 pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1048 #[cfg(ipc)]
1049 if buf.len() <= IpcBytes::INLINE_MAX {
1050 Ok(Self {
1051 inner: IpcBytesMutInner::Heap(buf),
1052 })
1053 } else {
1054 blocking::unblock(move || {
1055 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1056 b[..].copy_from_slice(&buf);
1057 Ok(b)
1058 })
1059 .await
1060 }
1061 #[cfg(not(ipc))]
1062 {
1063 Ok(Self {
1064 inner: IpcBytesMutInner::Heap(buf),
1065 })
1066 }
1067 }
1068
1069 pub async fn finish(mut self) -> io::Result<IpcBytes> {
1071 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1072 IpcBytesMutInner::Heap(v) => IpcBytesData::Heap(v),
1073 #[cfg(ipc)]
1074 IpcBytesMutInner::AnonMemMap(m) => IpcBytesData::AnonMemMap(m),
1075 #[cfg(ipc)]
1076 IpcBytesMutInner::MemMap { name, map, write_handle } => {
1077 blocking::unblock(move || Self::finish_memmap(name, map, write_handle)).await?
1078 }
1079 };
1080 Ok(IpcBytes(Arc::new(data)))
1081 }
1082
1083 #[cfg(ipc)]
1084 fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File) -> Result<IpcBytesData, io::Error> {
1085 let len = map.len();
1086 write_handle.unlock()?;
1087 let map = map.make_read_only()?;
1088 let mut permissions = write_handle.metadata()?.permissions();
1089 permissions.set_readonly(true);
1090 #[cfg(unix)]
1091 {
1092 use std::os::unix::fs::PermissionsExt;
1093 permissions.set_mode(0o400);
1094 }
1095 write_handle.set_permissions(permissions)?;
1096 drop(write_handle);
1097 let read_handle = std::fs::File::open(&name)?;
1098 read_handle.lock_shared()?;
1099 Ok(IpcBytesData::MemMap(IpcMemMap {
1100 name,
1101 range: 0..len,
1102 is_custom: false,
1103 map: Some(map),
1104 read_handle: Some(read_handle),
1105 }))
1106 }
1107}
1108impl IpcBytesMut {
1109 pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1111 #[cfg(ipc)]
1112 if buf.len() <= IpcBytes::INLINE_MAX {
1113 Ok(Self {
1114 inner: IpcBytesMutInner::Heap(buf),
1115 })
1116 } else {
1117 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1118 b[..].copy_from_slice(&buf);
1119 Ok(b)
1120 }
1121 #[cfg(not(ipc))]
1122 {
1123 Ok(Self {
1124 inner: IpcBytesMutInner::Heap(buf),
1125 })
1126 }
1127 }
1128
1129 pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1131 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1132 IpcBytesMutInner::Heap(v) => IpcBytesData::Heap(v),
1133 #[cfg(ipc)]
1134 IpcBytesMutInner::AnonMemMap(m) => IpcBytesData::AnonMemMap(m),
1135 #[cfg(ipc)]
1136 IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle)?,
1137 };
1138 Ok(IpcBytes(Arc::new(data)))
1139 }
1140}
1141#[cfg(ipc)]
1142impl Drop for IpcBytesMut {
1143 fn drop(&mut self) {
1144 if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1145 drop(map);
1146 drop(write_handle);
1147 std::fs::remove_file(name).ok();
1148 }
1149 }
1150}