zng_var/var_impl/
response_var.rs

1//! Special `Var<Response>` type impls
2
3use crate::{AnyVar, Var};
4
5use super::*;
6
7/// New paired [`ResponderVar`] and [`ResponseVar`] in the waiting state.
8pub fn response_var<T: VarValue>() -> (ResponderVar<T>, ResponseVar<T>) {
9    let responder = var(Response::Waiting::<T>);
10    let response = responder.read_only();
11    (ResponderVar(responder), ResponseVar(response))
12}
13
14/// New [`ResponseVar`] in the done state.
15pub fn response_done_var<T: VarValue>(response: T) -> ResponseVar<T> {
16    ResponseVar(var(Response::Done(response)).read_only())
17}
18
19/// Represents a read-write variable used to notify the completion of an async operation.
20///
21/// Use [`response_var`] to init.
22#[derive(Clone)]
23pub struct ResponderVar<T: VarValue>(Var<Response<T>>);
24
25/// Represents a read-only variable used to listen to a one time signal that an async operation has completed.
26///
27/// Use [`response_var`] or [`response_done_var`] to init.
28#[derive(Clone)]
29pub struct ResponseVar<T: VarValue>(Var<Response<T>>);
30
31impl<T: VarValue> ops::Deref for ResponderVar<T> {
32    type Target = Var<Response<T>>;
33
34    fn deref(&self) -> &Self::Target {
35        &self.0
36    }
37}
38impl<T: VarValue> IntoVar<Response<T>> for ResponderVar<T> {
39    fn into_var(self) -> Var<Response<T>> {
40        self.0
41    }
42}
43impl<T: VarValue> From<ResponderVar<T>> for Var<Response<T>> {
44    fn from(var: ResponderVar<T>) -> Self {
45        var.0
46    }
47}
48impl<T: VarValue> From<ResponderVar<T>> for AnyVar {
49    fn from(var: ResponderVar<T>) -> Self {
50        var.0.into()
51    }
52}
53
54impl<T: VarValue> ops::Deref for ResponseVar<T> {
55    type Target = Var<Response<T>>;
56
57    fn deref(&self) -> &Self::Target {
58        &self.0
59    }
60}
61impl<T: VarValue> IntoVar<Response<T>> for ResponseVar<T> {
62    fn into_var(self) -> Var<Response<T>> {
63        self.0
64    }
65}
66impl<T: VarValue> From<ResponseVar<T>> for Var<Response<T>> {
67    fn from(var: ResponseVar<T>) -> Self {
68        var.0
69    }
70}
71impl<T: VarValue> From<ResponseVar<T>> for AnyVar {
72    fn from(var: ResponseVar<T>) -> Self {
73        var.0.into()
74    }
75}
76impl<T: VarValue> From<Var<Response<T>>> for ResponderVar<T> {
77    fn from(var: Var<Response<T>>) -> Self {
78        if var.capabilities().is_always_read_only() {
79            tracing::error!("creating `ResponderVar` from `is_always_read_only` var");
80        }
81        ResponderVar(var)
82    }
83}
84impl<T: VarValue> From<Var<Response<T>>> for ResponseVar<T> {
85    fn from(var: Var<Response<T>>) -> Self {
86        ResponseVar(var.read_only())
87    }
88}
89
90/// Raw value in a [`ResponseVar`].
91#[derive(Clone, Copy, PartialEq)]
92pub enum Response<T: VarValue> {
93    /// Responder has not set the response yet.
94    Waiting,
95    /// Responder has set the response.
96    Done(T),
97}
98impl<T: VarValue> Response<T> {
99    /// Has response.
100    pub fn is_done(&self) -> bool {
101        matches!(self, Response::Done(_))
102    }
103
104    /// Does not have response.
105    pub fn is_waiting(&self) -> bool {
106        matches!(self, Response::Waiting)
107    }
108
109    /// Gets the response if done.
110    pub fn done(&self) -> Option<&T> {
111        match self {
112            Response::Waiting => None,
113            Response::Done(r) => Some(r),
114        }
115    }
116}
117impl<T: VarValue> fmt::Debug for Response<T> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        if f.alternate() {
120            match self {
121                Response::Waiting => {
122                    write!(f, "Response::Waiting")
123                }
124                Response::Done(v) => f.debug_tuple("Response::Done").field(v).finish(),
125            }
126        } else {
127            match self {
128                Response::Waiting => {
129                    write!(f, "Waiting")
130                }
131                Response::Done(v) => fmt::Debug::fmt(v, f),
132            }
133        }
134    }
135}
136impl<T: VarValue> From<Response<T>> for Option<T> {
137    fn from(value: Response<T>) -> Self {
138        match value {
139            Response::Waiting => None,
140            Response::Done(r) => Some(r),
141        }
142    }
143}
144impl<T: VarValue> From<Response<Option<T>>> for Option<T> {
145    fn from(value: Response<Option<T>>) -> Self {
146        match value {
147            Response::Waiting => None,
148            Response::Done(r) => r,
149        }
150    }
151}
152
153impl<T: VarValue> ResponseVar<T> {
154    /// Visit the response, if present.
155    pub fn with_rsp<R>(&self, read: impl FnOnce(&T) -> R) -> Option<R> {
156        self.with(|value| match value {
157            Response::Waiting => None,
158            Response::Done(value) => Some(read(value)),
159        })
160    }
161
162    /// Visit the response, if present and new.
163    pub fn with_new_rsp<R>(&self, read: impl FnOnce(&T) -> R) -> Option<R> {
164        self.with_new(|value| match value {
165            Response::Waiting => None,
166            Response::Done(value) => Some(read(value)),
167        })
168        .flatten()
169    }
170
171    /// If the response is received.
172    pub fn is_done(&self) -> bool {
173        self.with(Response::is_done)
174    }
175
176    /// If the response is not received yet.
177    pub fn is_waiting(&self) -> bool {
178        self.with(Response::is_waiting)
179    }
180
181    /// Clone the response value, if present.
182    pub fn rsp(&self) -> Option<T> {
183        self.with_rsp(Clone::clone)
184    }
185
186    /// Returns a future that awaits until a response is received and then returns a clone.
187    pub async fn wait_rsp(&self) -> T {
188        self.wait_done().await;
189        self.rsp().unwrap()
190    }
191
192    /// Returns a future that awaits until a response is received.
193    ///
194    /// [`rsp`]: Self::rsp
195    pub async fn wait_done(&self) {
196        self.wait_match(Response::is_done).await;
197    }
198
199    /// Clone the response, if present and new.
200    pub fn rsp_new(&self) -> Option<T> {
201        self.with_new_rsp(Clone::clone)
202    }
203
204    /// Map the response value using `map`, if the variable is awaiting a response uses the `waiting_value` first.
205    pub fn map_rsp<O, I, M>(&self, waiting_value: I, map: M) -> Var<O>
206    where
207        O: VarValue,
208        I: Fn() -> O + Send + Sync + 'static,
209        M: FnOnce(&T) -> O + Send + 'static,
210    {
211        let mut map = Some(map);
212        self.filter_map(
213            move |r| match r {
214                Response::Waiting => None,
215                Response::Done(r) => map.take().map(|m| m(r)),
216            },
217            waiting_value,
218        )
219    }
220
221    /// Map to another response variable.
222    pub fn map_response<O, M>(&self, mut map: M) -> ResponseVar<O>
223    where
224        O: VarValue,
225        M: FnMut(&T) -> O + Send + 'static,
226    {
227        ResponseVar(self.map(move |r| match r {
228            Response::Waiting => Response::Waiting,
229            Response::Done(t) => Response::Done(map(t)),
230        }))
231    }
232}
233impl<T: VarValue> IntoFuture for ResponseVar<T> {
234    type Output = T;
235
236    // refactor after 'impl_trait_in_assoc_type' is stable
237    type IntoFuture = std::pin::Pin<Box<dyn Future<Output = T> + Send + Sync>>;
238
239    fn into_future(self) -> Self::IntoFuture {
240        Box::pin(async move { self.wait_rsp().await })
241    }
242}
243
244impl<T: VarValue> ResponderVar<T> {
245    /// Sets the one time response.
246    pub fn respond(&self, response: T) {
247        self.set(Response::Done(response));
248    }
249
250    /// Creates a [`ResponseVar`] linked to this responder.
251    pub fn response_var(&self) -> ResponseVar<T> {
252        ResponseVar(self.read_only())
253    }
254}