1#![doc(html_favicon_url = "https://zng-ui.github.io/res/zng-logo-icon.png")]
2#![doc(html_logo_url = "https://zng-ui.github.io/res/zng-logo.png")]
3#![doc = include_str!(concat!("../", std::env!("CARGO_PKG_README")))]
15#![warn(unused_extern_crates)]
16#![warn(missing_docs)]
17
18use std::{mem, path::PathBuf, pin::Pin};
19
20use zng_app::{
21 update::UPDATES,
22 view_process::{
23 VIEW_PROCESS, VIEW_PROCESS_INITED_EVENT, ViewAudioHandle,
24 raw_events::{RAW_AUDIO_DECODE_ERROR_EVENT, RAW_AUDIO_DECODED_EVENT, RAW_AUDIO_METADATA_DECODED_EVENT},
25 },
26};
27use zng_app_context::app_local;
28use zng_clone_move::clmv;
29use zng_task::channel::IpcBytes;
30use zng_txt::ToTxt;
31use zng_unique_id::{IdEntry, IdMap};
32use zng_unit::ByteLength;
33use zng_var::{ResponseVar, Var, VarHandle, response_var, var};
34use zng_view_api::audio::{AudioDecoded, AudioMetadata, AudioRequest};
35
36mod types;
37pub use types::*;
38
39mod output;
40pub use output::*;
41
42app_local! {
43 static AUDIOS_SV: AudiosService = AudiosService::new();
44}
45
46struct AudiosService {
47 load_in_headless: Var<bool>,
48 limits: Var<AudioLimits>,
49
50 extensions: Vec<Box<dyn AudiosExtension>>,
51
52 cache: IdMap<AudioHash, AudioVar>,
53 outputs: IdMap<AudioOutputId, WeakAudioOutput>,
54 perm_outputs: IdMap<AudioOutputId, AudioOutput>,
55}
56impl AudiosService {
57 pub fn new() -> Self {
58 Self {
59 load_in_headless: var(false),
60 limits: var(AudioLimits::default()),
61
62 extensions: vec![],
63
64 cache: IdMap::new(),
65 outputs: IdMap::new(),
66 perm_outputs: IdMap::new(),
67 }
68 }
69}
70
71pub struct AUDIOS;
79impl AUDIOS {
80 pub fn load_in_headless(&self) -> Var<bool> {
90 AUDIOS_SV.read().load_in_headless.clone()
91 }
92
93 pub fn limits(&self) -> Var<AudioLimits> {
95 AUDIOS_SV.read().limits.clone()
96 }
97
98 pub fn read(&self, path: impl Into<PathBuf>) -> AudioVar {
104 self.audio_impl(path.into().into(), AudioOptions::cache(), None)
105 }
106
107 #[cfg(feature = "http")]
116 pub fn download<U>(&self, uri: U, accept: Option<zng_txt::Txt>) -> AudioVar
117 where
118 U: TryInto<zng_task::http::Uri>,
119 <U as TryInto<zng_task::http::Uri>>::Error: ToTxt,
120 {
121 match uri.try_into() {
122 Ok(uri) => self.audio_impl(AudioSource::Download(uri, accept), AudioOptions::cache(), None),
123 Err(e) => zng_var::const_var(AudioTrack::new_error(e.to_txt())),
124 }
125 }
126
127 pub fn from_static(&self, data: &'static [u8], format: impl Into<AudioDataFormat>) -> AudioVar {
147 self.audio_impl((data, format.into()).into(), AudioOptions::cache(), None)
148 }
149
150 pub fn from_data(&self, data: IpcBytes, format: impl Into<AudioDataFormat>) -> AudioVar {
158 self.audio_impl((data, format.into()).into(), AudioOptions::cache(), None)
159 }
160
161 pub fn audio(&self, source: impl Into<AudioSource>, options: AudioOptions, limits: Option<AudioLimits>) -> AudioVar {
170 self.audio_impl(source.into(), options, limits)
171 }
172 fn audio_impl(&self, source: AudioSource, options: AudioOptions, limits: Option<AudioLimits>) -> AudioVar {
173 let r = var(AudioTrack::new_loading());
174 let ri = r.read_only();
175 UPDATES.once_update("AUDIOS.audio", move || {
176 audio(source, options, limits, r);
177 });
178 ri
179 }
180
181 pub fn audio_task<F>(&self, source: impl IntoFuture<IntoFuture = F>, options: AudioOptions, limits: Option<AudioLimits>) -> AudioVar
193 where
194 F: Future<Output = AudioSource> + Send + 'static,
195 {
196 self.audio_task_impl(Box::pin(source.into_future()), options, limits)
197 }
198 fn audio_task_impl(
199 &self,
200 source: Pin<Box<dyn Future<Output = AudioSource> + Send + 'static>>,
201 options: AudioOptions,
202 limits: Option<AudioLimits>,
203 ) -> AudioVar {
204 let r = var(AudioTrack::new_loading());
205 let ri = r.read_only();
206 zng_task::spawn(async move {
207 let source = source.await;
208 audio(source, options, limits, r);
209 });
210 ri
211 }
212
213 pub fn register(&self, key: Option<AudioHash>, audio: (ViewAudioHandle, AudioMetadata, AudioDecoded)) -> AudioVar {
221 let r = var(AudioTrack::new_loading());
222 let rr = r.read_only();
223 UPDATES.once_update("AUDIOS.register", move || {
224 audio_view(key, audio.0, audio.1, audio.2, None, r);
225 });
226 rr
227 }
228
229 pub fn clean(&self, key: AudioHash) {
234 UPDATES.once_update("AUDIOS.clean", move || {
235 if let IdEntry::Occupied(e) = AUDIOS_SV.write().cache.entry(key)
236 && e.get().strong_count() == 1
237 {
238 e.remove();
239 }
240 });
241 }
242
243 pub fn purge(&self, key: AudioHash) {
248 UPDATES.once_update("AUDIOS.purge", move || {
249 AUDIOS_SV.write().cache.remove(&key);
250 });
251 }
252
253 pub fn cache_key(&self, audio: &AudioTrack) -> Option<AudioHash> {
255 let key = audio.cache_key?;
256 if AUDIOS_SV.read().cache.contains_key(&key) {
257 Some(key)
258 } else {
259 None
260 }
261 }
262
263 pub fn is_cached(&self, audio: &AudioTrack) -> bool {
265 match &audio.cache_key {
266 Some(k) => AUDIOS_SV.read().cache.contains_key(k),
267 None => false,
268 }
269 }
270
271 pub fn clean_all(&self) {
273 UPDATES.once_update("AUDIOS.clean_all", || {
274 AUDIOS_SV.write().cache.retain(|_, v| v.strong_count() > 1);
275 });
276 }
277
278 pub fn purge_all(&self) {
283 UPDATES.once_update("AUDIOS.purge_all", || {
284 AUDIOS_SV.write().cache.clear();
285 });
286 }
287
288 pub fn extend(&self, extension: Box<dyn AudiosExtension>) {
292 UPDATES.once_update("AUDIOS.extend", move || {
293 AUDIOS_SV.write().extensions.push(extension);
294 });
295 }
296
297 pub fn available_formats(&self) -> Vec<AudioFormat> {
299 let mut formats = VIEW_PROCESS.info().audio.clone();
300
301 let mut exts = mem::take(&mut AUDIOS_SV.write().extensions);
302 for ext in exts.iter_mut() {
303 ext.available_formats(&mut formats);
304 }
305 let mut s = AUDIOS_SV.write();
306 exts.append(&mut s.extensions);
307 s.extensions = exts;
308
309 formats
310 }
311
312 #[cfg(feature = "http")]
313 fn http_accept(&self) -> zng_txt::Txt {
314 let mut s = String::new();
315 let mut sep = "";
316 for f in self.available_formats() {
317 for f in f.media_type_suffixes_iter() {
318 s.push_str(sep);
319 s.push_str("audio/");
320 s.push_str(f);
321 sep = ",";
322 }
323 }
324 s.into()
325 }
326}
327
328impl AUDIOS {
329 pub fn open_output(
340 &self,
341 id: impl Into<AudioOutputId>,
342 init: impl FnOnce(&mut AudioOutputOptions) + Send + 'static,
343 ) -> ResponseVar<AudioOutput> {
344 self.open_audio_out(id.into(), Box::new(init))
345 }
346
347 fn open_audio_out(&self, id: AudioOutputId, init: Box<dyn FnOnce(&mut AudioOutputOptions) + Send>) -> ResponseVar<AudioOutput> {
348 let (responder, r) = response_var();
349 UPDATES.once_update("AUDIOS.open_output", move || match AUDIOS_SV.write().outputs.entry(id) {
350 IdEntry::Occupied(mut e) => match e.get().upgrade() {
351 Some(r) => responder.respond(r),
352 None => {
353 let mut opt = AudioOutputOptions::default();
354 init(&mut opt);
355 let r = AudioOutput::open(id, opt);
356 e.insert(r.downgrade());
357 responder.respond(r);
358 }
359 },
360 IdEntry::Vacant(e) => {
361 let mut opt = AudioOutputOptions::default();
362 init(&mut opt);
363 let r = AudioOutput::open(id, opt);
364 e.insert(r.downgrade());
365 responder.respond(r);
366 }
367 });
368 r
369 }
370
371 pub(crate) fn perm_output(&self, output: &AudioOutput) {
372 AUDIOS_SV.write().perm_outputs.entry(output.id()).or_insert_with(|| output.clone());
373 }
374}
375
376fn audio(mut source: AudioSource, mut options: AudioOptions, limits: Option<AudioLimits>, r: Var<AudioTrack>) {
377 let mut s = AUDIOS_SV.write();
378
379 let limits = limits.unwrap_or_else(|| s.limits.get());
380
381 let mut exts = mem::take(&mut s.extensions);
383 drop(s); for ext in &mut exts {
385 ext.audio(&limits, &mut source, &mut options);
386 }
387 let mut s = AUDIOS_SV.write();
388 exts.append(&mut s.extensions);
389 s.extensions = exts;
390
391 if let AudioSource::Audio(var) = source {
392 var.set_bind(&r).perm();
394 r.hold(var).perm();
395 return;
396 }
397
398 if !VIEW_PROCESS.is_available() && !s.load_in_headless.get() {
399 tracing::debug!("ignoring audio request due headless mode");
400 return;
401 }
402
403 let key = source.hash128(&options).unwrap();
404
405 match options.cache_mode {
407 AudioCacheMode::Ignore => (),
408 AudioCacheMode::Cache => {
409 match s.cache.entry(key) {
410 IdEntry::Occupied(e) => {
411 let var = e.get();
413 var.set_bind(&r).perm();
414 r.hold(var.clone()).perm();
415 return;
416 }
417 IdEntry::Vacant(e) => {
418 e.insert(r.clone());
420 }
421 }
422 }
423 AudioCacheMode::Retry => {
424 match s.cache.entry(key) {
425 IdEntry::Occupied(mut e) => {
426 let var = e.get();
427 if var.with(AudioTrack::is_error) {
428 r.set_bind(var).perm();
433 var.hold(r.clone()).perm();
434
435 e.insert(r.clone());
437 } else {
438 var.set_bind(&r).perm();
440 r.hold(var.clone()).perm();
441 return;
442 }
443 }
444 IdEntry::Vacant(e) => {
445 e.insert(r.clone());
447 }
448 }
449 }
450 AudioCacheMode::Reload => {
451 match s.cache.entry(key) {
452 IdEntry::Occupied(mut e) => {
453 let var = e.get();
454 r.set_bind(var).perm();
455 var.hold(r.clone()).perm();
456
457 e.insert(r.clone());
458 }
459 IdEntry::Vacant(e) => {
460 e.insert(r.clone());
462 }
463 }
464 }
465 }
466 drop(s);
467
468 match source {
469 AudioSource::Read(path) => {
470 fn read(path: PathBuf, limit: ByteLength) -> std::io::Result<IpcBytes> {
471 let file = std::fs::File::open(path)?;
472 if file.metadata()?.len() > limit.bytes() as u64 {
473 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "file length exceeds limit"));
474 }
475 IpcBytes::from_file_blocking(file)
476 }
477 let limit = limits.max_encoded_len;
478 let data_format = match path.extension() {
479 Some(ext) => AudioDataFormat::FileExtension(ext.to_string_lossy().to_txt()),
480 None => AudioDataFormat::Unknown,
481 };
482 zng_task::spawn_wait(move || match read(path, limit) {
483 Ok(data) => audio_data(false, Some(key), data_format, data, options, limits, r),
484 Err(e) => {
485 r.set(AudioTrack::new_error(e.to_txt()));
486 }
487 });
488 }
489 #[cfg(feature = "http")]
490 AudioSource::Download(uri, accept) => {
491 let accept = accept.unwrap_or_else(|| AUDIOS.http_accept());
492
493 use zng_task::http::*;
494 async fn download(uri: Uri, accept: zng_txt::Txt, limit: ByteLength) -> Result<(AudioDataFormat, IpcBytes), Error> {
495 let request = Request::get(uri)?.max_length(limit).header(header::ACCEPT, accept.as_str())?;
496 let mut response = send(request).await?;
497 let data_format = match response.header().get(&header::CONTENT_TYPE).and_then(|m| m.to_str().ok()) {
498 Some(m) => AudioDataFormat::MimeType(m.to_txt()),
499 None => AudioDataFormat::Unknown,
500 };
501 let data = response.body().await?;
502
503 Ok((data_format, data))
504 }
505
506 let limit = limits.max_encoded_len;
507 zng_task::spawn(async move {
508 match download(uri, accept, limit).await {
509 Ok((fmt, data)) => {
510 audio_data(false, Some(key), fmt, data, options, limits, r);
511 }
512 Err(e) => r.set(AudioTrack::new_error(e.to_txt())),
513 }
514 });
515 }
516 AudioSource::Data(_, data, format) => audio_data(false, Some(key), format, data, options, limits, r),
517 _ => unreachable!(),
518 }
519}
520
521fn audio_data(
523 is_respawn: bool,
524 cache_key: Option<AudioHash>,
525 format: AudioDataFormat,
526 data: IpcBytes,
527 options: AudioOptions,
528 limits: AudioLimits,
529 r: Var<AudioTrack>,
530) {
531 if !is_respawn && let Some(key) = cache_key {
532 let mut replaced = false;
533 let mut exts = mem::take(&mut AUDIOS_SV.write().extensions);
534 for ext in &mut exts {
535 if let Some(replacement) = ext.audio_data(limits.max_decoded_len, &key, &data, &format, &options) {
536 replacement.set_bind(&r).perm();
537 r.hold(replacement).perm();
538
539 replaced = true;
540 break;
541 }
542 }
543
544 {
545 let mut s = AUDIOS_SV.write();
546 exts.append(&mut s.extensions);
547 s.extensions = exts;
548
549 if replaced {
550 return;
551 }
552 }
553 }
554
555 if !VIEW_PROCESS.is_available() {
556 tracing::debug!("ignoring audio view request after test load due to headless mode");
557 return;
558 }
559
560 let mut request = AudioRequest::new(format.clone(), data.clone(), limits.max_decoded_len.bytes() as u64);
561 request.tracks = options.tracks;
562
563 let try_gen = VIEW_PROCESS.generation();
564
565 match VIEW_PROCESS.add_audio(request) {
566 Ok(view_img) => audio_view(
567 cache_key,
568 view_img,
569 AudioMetadata::default(),
570 AudioDecoded::default(),
571 Some((format, data, options, limits)),
572 r,
573 ),
574 Err(_) => {
575 tracing::debug!("audio view request failed, will retry on respawn");
576
577 zng_task::spawn(async move {
578 VIEW_PROCESS_INITED_EVENT.wait_match(move |a| a.generation != try_gen).await;
579 audio_data(true, cache_key, format, data, options, limits, r);
580 });
581 }
582 }
583}
584fn audio_view(
586 cache_key: Option<AudioHash>,
587 handle: ViewAudioHandle,
588 meta: AudioMetadata,
589 decoded: AudioDecoded,
590 respawn_data: Option<(AudioDataFormat, IpcBytes, AudioOptions, AudioLimits)>,
591 r: Var<AudioTrack>,
592) {
593 let aud = AudioTrack::new(cache_key, handle, meta, decoded);
594 let is_loaded = aud.is_loaded();
595 let is_dummy = aud.view_handle().is_dummy();
596 r.set(aud);
597
598 if is_loaded {
599 audio_decoded(r);
600 return;
601 }
602
603 if is_dummy {
604 tracing::error!("tried to register dummy handle");
605 return;
606 }
607
608 let decoding_respawn_handle = if respawn_data.is_some() {
610 let r_weak = r.downgrade();
611 let mut respawn_data = respawn_data;
612 VIEW_PROCESS_INITED_EVENT.hook(move |_| {
613 if let Some(r) = r_weak.upgrade() {
614 let (format, data, options, limits) = respawn_data.take().unwrap();
615 audio_data(true, cache_key, format, data, options, limits, r);
616 }
617 false
618 })
619 } else {
620 VarHandle::dummy()
622 };
623
624 let r_weak = r.downgrade();
626 let decode_error_handle = RAW_AUDIO_DECODE_ERROR_EVENT.hook(move |args| match r_weak.upgrade() {
627 Some(r) => {
628 if r.with(|aud| aud.view_handle() == &args.handle.upgrade().unwrap()) {
629 r.set(AudioTrack::new_error(args.error.clone()));
630 false
631 } else {
632 r.with(AudioTrack::is_loading)
633 }
634 }
635 None => false,
636 });
637
638 let r_weak = r.downgrade();
640 let decode_meta_handle = RAW_AUDIO_METADATA_DECODED_EVENT.hook(move |args| match r_weak.upgrade() {
641 Some(r) => {
642 if r.with(|aud| aud.view_handle() == &args.handle.upgrade().unwrap()) {
643 let meta = args.meta.clone();
644 r.modify(move |i| i.meta = meta);
645 } else if let Some(p) = &args.meta.parent
646 && p.parent == r.with(|aud| aud.view_handle().audio_id())
647 {
648 let mut decoded = AudioDecoded::default();
650 decoded.id = args.meta.id;
651 let track = var(AudioTrack::new(
652 None,
653 args.handle.upgrade().unwrap(),
654 args.meta.clone(),
655 decoded.clone(),
656 ));
657 r.modify(clmv!(track, |i| i.insert_track(track)));
658 audio_view(None, args.handle.upgrade().unwrap(), args.meta.clone(), decoded, None, track);
659 }
660 r.with(AudioTrack::is_loading)
661 }
662 None => false,
663 });
664
665 let r_weak = r.downgrade();
667 RAW_AUDIO_DECODED_EVENT
668 .hook(move |args| {
669 let _hold = [&decoding_respawn_handle, &decode_error_handle, &decode_meta_handle];
670 match r_weak.upgrade() {
671 Some(r) => {
672 if r.with(|aud| aud.view_handle() == &args.handle.upgrade().unwrap()) {
673 let data = args.audio.upgrade().unwrap();
674 let is_loading = !data.is_full;
675 r.modify(move |i| i.data = (*data.0).clone());
676 if !is_loading {
677 audio_decoded(r);
678 }
679 is_loading
680 } else {
681 r.with(AudioTrack::is_loading)
682 }
683 }
684 None => false,
685 }
686 })
687 .perm();
688}
689fn audio_decoded(r: Var<AudioTrack>) {
691 let r_weak = r.downgrade();
692 VIEW_PROCESS_INITED_EVENT
693 .hook(move |_| {
694 if let Some(r) = r_weak.upgrade() {
695 let aud = r.get();
696 if !aud.is_loaded() {
697 return false;
699 }
700
701 let options = AudioOptions::none();
703 let format = AudioDataFormat::InterleavedF32 {
704 channel_count: aud.channel_count(),
705 sample_rate: aud.sample_rate(),
706 total_duration: aud.total_duration(),
707 };
708 audio_data(
709 true,
710 aud.cache_key,
711 format,
712 aud.chunk().into_inner(),
713 options,
714 AudioLimits::none(),
715 r,
716 );
717 }
718 false
719 })
720 .perm();
721}