1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
15#![warn(unused_extern_crates)]
16#![warn(missing_docs)]
17
18use std::{path::PathBuf, pin::Pin};
19
20use zng_app::{
21 update::UPDATES,
22 view_process::{
23 VIEW_PROCESS, VIEW_PROCESS_INITED_EVENT, ViewAudioHandle,
24 raw_events::{RAW_AUDIO_DECODE_ERROR_EVENT, RAW_AUDIO_DECODED_EVENT, RAW_AUDIO_METADATA_DECODED_EVENT},
25 },
26};
27use zng_app_context::app_local;
28use zng_clone_move::clmv;
29use zng_task::channel::IpcBytes;
30use zng_txt::ToTxt;
31use zng_unique_id::{IdEntry, IdMap};
32use zng_unit::ByteLength;
33use zng_var::{ResponseVar, Var, VarHandle, response_var, var};
34use zng_view_api::audio::{AudioDecoded, AudioMetadata, AudioRequest};
35
36mod types;
37pub use types::*;
38
39mod output;
40pub use output::*;
41
42app_local! {
43 static AUDIOS_SV: AudiosService = AudiosService::new();
44 static AUDIOS_EXTENSIONS: Vec<Box<dyn AudiosExtension>> = vec![];
45}
46
47struct AudiosService {
48 load_in_headless: Var<bool>,
49 limits: Var<AudioLimits>,
50
51 cache: IdMap<AudioHash, AudioVar>,
52 outputs: IdMap<AudioOutputId, WeakAudioOutput>,
53 perm_outputs: IdMap<AudioOutputId, AudioOutput>,
54}
55impl AudiosService {
56 pub fn new() -> Self {
57 Self {
58 load_in_headless: var(false),
59 limits: var(AudioLimits::default()),
60
61 cache: IdMap::new(),
62 outputs: IdMap::new(),
63 perm_outputs: IdMap::new(),
64 }
65 }
66}
67
68pub struct AUDIOS;
76impl AUDIOS {
77 pub fn load_in_headless(&self) -> Var<bool> {
87 AUDIOS_SV.read().load_in_headless.clone()
88 }
89
90 pub fn limits(&self) -> Var<AudioLimits> {
92 AUDIOS_SV.read().limits.clone()
93 }
94
95 pub fn read(&self, path: impl Into<PathBuf>) -> AudioVar {
101 self.audio_impl(path.into().into(), AudioOptions::cache(), None)
102 }
103
104 #[cfg(feature = "http")]
113 pub fn download<U>(&self, uri: U, accept: Option<zng_txt::Txt>) -> AudioVar
114 where
115 U: TryInto<zng_task::http::Uri>,
116 <U as TryInto<zng_task::http::Uri>>::Error: ToTxt,
117 {
118 match uri.try_into() {
119 Ok(uri) => self.audio_impl(AudioSource::Download(uri, accept), AudioOptions::cache(), None),
120 Err(e) => zng_var::const_var(AudioTrack::new_error(e.to_txt())),
121 }
122 }
123
124 pub fn from_static(&self, data: &'static [u8], format: impl Into<AudioDataFormat>) -> AudioVar {
144 self.audio_impl((data, format.into()).into(), AudioOptions::cache(), None)
145 }
146
147 pub fn from_data(&self, data: IpcBytes, format: impl Into<AudioDataFormat>) -> AudioVar {
155 self.audio_impl((data, format.into()).into(), AudioOptions::cache(), None)
156 }
157
158 pub fn audio(&self, source: impl Into<AudioSource>, options: AudioOptions, limits: Option<AudioLimits>) -> AudioVar {
167 self.audio_impl(source.into(), options, limits)
168 }
169 fn audio_impl(&self, source: AudioSource, options: AudioOptions, limits: Option<AudioLimits>) -> AudioVar {
170 let r = var(AudioTrack::new_loading());
171 let ri = r.read_only();
172 UPDATES.once_update("AUDIOS.audio", move || {
173 audio(source, options, limits, r);
174 });
175 ri
176 }
177
178 pub fn audio_task<F>(&self, source: impl IntoFuture<IntoFuture = F>, options: AudioOptions, limits: Option<AudioLimits>) -> AudioVar
190 where
191 F: Future<Output = AudioSource> + Send + 'static,
192 {
193 self.audio_task_impl(Box::pin(source.into_future()), options, limits)
194 }
195 fn audio_task_impl(
196 &self,
197 source: Pin<Box<dyn Future<Output = AudioSource> + Send + 'static>>,
198 options: AudioOptions,
199 limits: Option<AudioLimits>,
200 ) -> AudioVar {
201 let r = var(AudioTrack::new_loading());
202 let ri = r.read_only();
203 zng_task::spawn(async move {
204 let source = source.await;
205 audio(source, options, limits, r);
206 });
207 ri
208 }
209
210 pub fn register(&self, key: Option<AudioHash>, audio: (ViewAudioHandle, AudioMetadata, AudioDecoded)) -> AudioVar {
218 let r = var(AudioTrack::new_loading());
219 let rr = r.read_only();
220 UPDATES.once_update("AUDIOS.register", move || {
221 audio_view(key, audio.0, audio.1, audio.2, None, r);
222 });
223 rr
224 }
225
226 pub fn clean(&self, key: AudioHash) {
231 UPDATES.once_update("AUDIOS.clean", move || {
232 if let IdEntry::Occupied(e) = AUDIOS_SV.write().cache.entry(key)
233 && e.get().strong_count() == 1
234 {
235 e.remove();
236 }
237 });
238 }
239
240 pub fn purge(&self, key: AudioHash) {
245 UPDATES.once_update("AUDIOS.purge", move || {
246 AUDIOS_SV.write().cache.remove(&key);
247 });
248 }
249
250 pub fn cache_key(&self, audio: &AudioTrack) -> Option<AudioHash> {
252 let key = audio.cache_key?;
253 if AUDIOS_SV.read().cache.contains_key(&key) {
254 Some(key)
255 } else {
256 None
257 }
258 }
259
260 pub fn is_cached(&self, audio: &AudioTrack) -> bool {
262 match &audio.cache_key {
263 Some(k) => AUDIOS_SV.read().cache.contains_key(k),
264 None => false,
265 }
266 }
267
268 pub fn clean_all(&self) {
270 UPDATES.once_update("AUDIOS.clean_all", || {
271 AUDIOS_SV.write().cache.retain(|_, v| v.strong_count() > 1);
272 });
273 }
274
275 pub fn purge_all(&self) {
280 UPDATES.once_update("AUDIOS.purge_all", || {
281 AUDIOS_SV.write().cache.clear();
282 });
283 }
284
285 pub fn extend(&self, extension: Box<dyn AudiosExtension>) {
289 UPDATES.once_update("AUDIOS.extend", move || {
290 AUDIOS_EXTENSIONS.write().push(extension);
291 });
292 }
293
294 pub fn available_formats(&self) -> Vec<AudioFormat> {
296 let mut formats = VIEW_PROCESS.info().audio.clone();
297
298 for ext in AUDIOS_EXTENSIONS.read().iter() {
299 ext.available_formats(&mut formats);
300 }
301
302 formats
303 }
304
305 #[cfg(feature = "http")]
306 fn http_accept(&self) -> zng_txt::Txt {
307 let mut s = String::new();
308 let mut sep = "";
309 for f in self.available_formats() {
310 for f in f.media_type_suffixes_iter() {
311 s.push_str(sep);
312 s.push_str("audio/");
313 s.push_str(f);
314 sep = ",";
315 }
316 }
317 s.into()
318 }
319}
320
321impl AUDIOS {
322 pub fn open_output(
333 &self,
334 id: impl Into<AudioOutputId>,
335 init: impl FnOnce(&mut AudioOutputOptions) + Send + 'static,
336 ) -> ResponseVar<AudioOutput> {
337 self.open_audio_out(id.into(), Box::new(init))
338 }
339
340 fn open_audio_out(&self, id: AudioOutputId, init: Box<dyn FnOnce(&mut AudioOutputOptions) + Send>) -> ResponseVar<AudioOutput> {
341 let (responder, r) = response_var();
342 UPDATES.once_update("AUDIOS.open_output", move || match AUDIOS_SV.write().outputs.entry(id) {
343 IdEntry::Occupied(mut e) => match e.get().upgrade() {
344 Some(r) => responder.respond(r),
345 None => {
346 let mut opt = AudioOutputOptions::default();
347 init(&mut opt);
348 let r = AudioOutput::open(id, opt);
349 e.insert(r.downgrade());
350 responder.respond(r);
351 }
352 },
353 IdEntry::Vacant(e) => {
354 let mut opt = AudioOutputOptions::default();
355 init(&mut opt);
356 let r = AudioOutput::open(id, opt);
357 e.insert(r.downgrade());
358 responder.respond(r);
359 }
360 });
361 r
362 }
363
364 pub(crate) fn perm_output(&self, output: &AudioOutput) {
365 AUDIOS_SV.write().perm_outputs.entry(output.id()).or_insert_with(|| output.clone());
366 }
367}
368
369fn audio(mut source: AudioSource, mut options: AudioOptions, limits: Option<AudioLimits>, r: Var<AudioTrack>) {
370 let limits = limits.unwrap_or_else(|| AUDIOS_SV.read().limits.get());
371
372 {
374 let mut exts = AUDIOS_EXTENSIONS.write();
375 if !exts.is_empty() {
376 tracing::trace!("process audio with {} extensions", exts.len());
377 }
378 for ext in exts.iter_mut() {
379 ext.audio(&limits, &mut source, &mut options);
380 }
381 }
382
383 let mut s = AUDIOS_SV.write();
385
386 if let AudioSource::Audio(var) = source {
387 var.set_bind(&r).perm();
389 r.hold(var).perm();
390 return;
391 }
392
393 if !VIEW_PROCESS.is_available() && !s.load_in_headless.get() {
394 tracing::debug!("ignoring audio request due headless mode");
395 return;
396 }
397
398 let key = source.hash128(&options).unwrap();
399
400 match options.cache_mode {
402 AudioCacheMode::Ignore => (),
403 AudioCacheMode::Cache => {
404 match s.cache.entry(key) {
405 IdEntry::Occupied(e) => {
406 let var = e.get();
408 var.set_bind(&r).perm();
409 r.hold(var.clone()).perm();
410 return;
411 }
412 IdEntry::Vacant(e) => {
413 e.insert(r.clone());
415 }
416 }
417 }
418 AudioCacheMode::Retry => {
419 match s.cache.entry(key) {
420 IdEntry::Occupied(mut e) => {
421 let var = e.get();
422 if var.with(AudioTrack::is_error) {
423 r.set_bind(var).perm();
428 var.hold(r.clone()).perm();
429
430 e.insert(r.clone());
432 } else {
433 var.set_bind(&r).perm();
435 r.hold(var.clone()).perm();
436 return;
437 }
438 }
439 IdEntry::Vacant(e) => {
440 e.insert(r.clone());
442 }
443 }
444 }
445 AudioCacheMode::Reload => {
446 match s.cache.entry(key) {
447 IdEntry::Occupied(mut e) => {
448 let var = e.get();
449 r.set_bind(var).perm();
450 var.hold(r.clone()).perm();
451
452 e.insert(r.clone());
453 }
454 IdEntry::Vacant(e) => {
455 e.insert(r.clone());
457 }
458 }
459 }
460 }
461 drop(s);
462
463 match source {
464 AudioSource::Read(path) => {
465 fn read(path: &PathBuf, limit: (&AudioSourceFilter<PathBuf>, ByteLength)) -> std::io::Result<IpcBytes> {
466 if !limit.0.allows(path) {
467 return Err(std::io::Error::new(
468 std::io::ErrorKind::PermissionDenied,
469 "file path no allowed by limit",
470 ));
471 }
472 let file = std::fs::File::open(path)?;
473 if file.metadata()?.len() > limit.1.bytes() as u64 {
474 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "file length exceeds limit"));
475 }
476 IpcBytes::from_file_blocking(file)
477 }
478 let data_format = match path.extension() {
479 Some(ext) => AudioDataFormat::FileExtension(ext.to_string_lossy().to_txt()),
480 None => AudioDataFormat::Unknown,
481 };
482 zng_task::spawn_wait(move || match read(&path, (&limits.allow_path, limits.max_encoded_len)) {
483 Ok(data) => audio_data(false, Some(key), data_format, data, options, limits, r),
484 Err(e) => {
485 r.set(AudioTrack::new_error(e.to_txt()));
486 }
487 });
488 }
489 #[cfg(feature = "http")]
490 AudioSource::Download(uri, accept) => {
491 let accept = accept.unwrap_or_else(|| AUDIOS.http_accept());
492
493 use zng_task::http::*;
494 async fn download(
495 uri: Uri,
496 accept: zng_txt::Txt,
497 limit: (AudioSourceFilter<Uri>, ByteLength),
498 ) -> Result<(AudioDataFormat, IpcBytes), Error> {
499 if !limit.0.allows(&uri) {
500 return Err(Box::new(std::io::Error::new(
501 std::io::ErrorKind::PermissionDenied,
502 "uri no allowed by limit",
503 )));
504 }
505 let request = Request::get(uri)?.max_length(limit.1).header(header::ACCEPT, accept.as_str())?;
506 let mut response = send(request).await?;
507 let data_format = match response.header().get(&header::CONTENT_TYPE).and_then(|m| m.to_str().ok()) {
508 Some(m) => AudioDataFormat::MimeType(m.to_txt()),
509 None => AudioDataFormat::Unknown,
510 };
511 let data = response.body().await?;
512
513 Ok((data_format, data))
514 }
515
516 zng_task::spawn(async move {
517 match download(uri, accept, (limits.allow_uri.clone(), limits.max_encoded_len)).await {
518 Ok((fmt, data)) => {
519 audio_data(false, Some(key), fmt, data, options, limits, r);
520 }
521 Err(e) => r.set(AudioTrack::new_error(e.to_txt())),
522 }
523 });
524 }
525 AudioSource::Data(_, data, format) => audio_data(false, Some(key), format, data, options, limits, r),
526 _ => unreachable!(),
527 }
528}
529
530fn audio_data(
532 is_respawn: bool,
533 cache_key: Option<AudioHash>,
534 format: AudioDataFormat,
535 data: IpcBytes,
536 options: AudioOptions,
537 limits: AudioLimits,
538 r: Var<AudioTrack>,
539) {
540 if !is_respawn && let Some(key) = cache_key {
541 let mut exts = AUDIOS_EXTENSIONS.write();
542 if !exts.is_empty() {
543 tracing::trace!("process audio_data with {} extensions", exts.len());
544 }
545 for ext in exts.iter_mut() {
546 if let Some(replacement) = ext.audio_data(limits.max_decoded_len, &key, &data, &format, &options) {
547 replacement.set_bind(&r).perm();
548 r.hold(replacement).perm();
549
550 tracing::trace!("extension replaced audio_data");
551 return;
552 }
553 }
554 }
555
556 if !VIEW_PROCESS.is_available() {
557 tracing::debug!("ignoring audio view request after test load due to headless mode");
558 return;
559 }
560
561 let mut request = AudioRequest::new(format.clone(), data.clone(), limits.max_decoded_len.bytes() as u64);
562 request.tracks = options.tracks;
563
564 let try_gen = VIEW_PROCESS.generation();
565
566 match VIEW_PROCESS.add_audio(request) {
567 Ok(view_img) => audio_view(
568 cache_key,
569 view_img,
570 AudioMetadata::default(),
571 AudioDecoded::default(),
572 Some((format, data, options, limits)),
573 r,
574 ),
575 Err(_) => {
576 tracing::debug!("audio view request failed, will retry on respawn");
577
578 zng_task::spawn(async move {
579 VIEW_PROCESS_INITED_EVENT.wait_match(move |a| a.generation != try_gen).await;
580 audio_data(true, cache_key, format, data, options, limits, r);
581 });
582 }
583 }
584}
585fn audio_view(
587 cache_key: Option<AudioHash>,
588 handle: ViewAudioHandle,
589 meta: AudioMetadata,
590 decoded: AudioDecoded,
591 respawn_data: Option<(AudioDataFormat, IpcBytes, AudioOptions, AudioLimits)>,
592 r: Var<AudioTrack>,
593) {
594 let aud = AudioTrack::new(cache_key, handle, meta, decoded);
595 let is_loaded = aud.is_loaded();
596 let is_dummy = aud.view_handle().is_dummy();
597 r.set(aud);
598
599 if is_loaded {
600 audio_decoded(r);
601 return;
602 }
603
604 if is_dummy {
605 tracing::error!("tried to register dummy handle");
606 return;
607 }
608
609 let decoding_respawn_handle = if respawn_data.is_some() {
611 let r_weak = r.downgrade();
612 let mut respawn_data = respawn_data;
613 VIEW_PROCESS_INITED_EVENT.hook(move |_| {
614 if let Some(r) = r_weak.upgrade() {
615 let (format, data, options, limits) = respawn_data.take().unwrap();
616 audio_data(true, cache_key, format, data, options, limits, r);
617 }
618 false
619 })
620 } else {
621 VarHandle::dummy()
623 };
624
625 let r_weak = r.downgrade();
627 let decode_error_handle = RAW_AUDIO_DECODE_ERROR_EVENT.hook(move |args| match r_weak.upgrade() {
628 Some(r) => {
629 if let Some(handle) = args.handle.upgrade()
630 && r.with(|aud| aud.view_handle() == &handle)
631 {
632 r.set(AudioTrack::new_error(args.error.clone()));
633 false
634 } else {
635 r.with(AudioTrack::is_loading)
636 }
637 }
638 None => false,
639 });
640
641 let r_weak = r.downgrade();
643 let decode_meta_handle = RAW_AUDIO_METADATA_DECODED_EVENT.hook(move |args| match r_weak.upgrade() {
644 Some(r) => {
645 let handle = match args.handle.upgrade() {
646 Some(h) => h,
647 None => return r.with(AudioTrack::is_loading),
648 };
649 if r.with(|aud| aud.view_handle() == &handle) {
650 let meta = args.meta.clone();
651 r.modify(move |i| i.meta = meta);
652 } else if let Some(p) = &args.meta.parent
653 && p.parent == r.with(|aud| aud.view_handle().audio_id())
654 {
655 let mut decoded = AudioDecoded::default();
657 decoded.id = args.meta.id;
658 let track = var(AudioTrack::new(None, handle.clone(), args.meta.clone(), decoded.clone()));
659 r.modify(clmv!(track, |i| i.insert_track(track)));
660 audio_view(None, handle, args.meta.clone(), decoded, None, track);
661 }
662 r.with(AudioTrack::is_loading)
663 }
664 None => false,
665 });
666
667 let r_weak = r.downgrade();
669 RAW_AUDIO_DECODED_EVENT
670 .hook(move |args| {
671 let _hold = [&decoding_respawn_handle, &decode_error_handle, &decode_meta_handle];
672 match r_weak.upgrade() {
673 Some(r) => {
674 if let Some(handle) = args.handle.upgrade()
675 && r.with(|aud| aud.view_handle() == &handle)
676 {
677 let data = args.audio.upgrade().unwrap();
678 let is_loading = !data.is_full;
679 r.modify(move |i| i.data = (*data.0).clone());
680 if !is_loading {
681 audio_decoded(r);
682 }
683 is_loading
684 } else {
685 r.with(AudioTrack::is_loading)
686 }
687 }
688 None => false,
689 }
690 })
691 .perm();
692}
693fn audio_decoded(r: Var<AudioTrack>) {
695 let r_weak = r.downgrade();
696 VIEW_PROCESS_INITED_EVENT
697 .hook(move |_| {
698 if let Some(r) = r_weak.upgrade() {
699 let aud = r.get();
700 if !aud.is_loaded() {
701 return false;
703 }
704
705 let options = AudioOptions::none();
707 let format = AudioDataFormat::InterleavedF32 {
708 channel_count: aud.channel_count(),
709 sample_rate: aud.sample_rate(),
710 total_duration: aud.total_duration(),
711 };
712 audio_data(
713 true,
714 aud.cache_key,
715 format,
716 aud.chunk().into_inner(),
717 options,
718 AudioLimits::none(),
719 r,
720 );
721 }
722 false
723 })
724 .perm();
725}