zng_var/
response.rs

1use super::*;
2
3/// New paired [`ResponderVar`] and [`ResponseVar`] in the waiting state.
4pub fn response_var<T: VarValue>() -> (ResponderVar<T>, ResponseVar<T>) {
5    let responder = var(Response::Waiting::<T>);
6    let response = responder.read_only();
7    (responder, response)
8}
9
10/// New [`ResponseVar`] in the done state.
11pub fn response_done_var<T: VarValue>(response: T) -> ResponseVar<T> {
12    var(Response::Done(response)).read_only()
13}
14
15/// Variable used to notify the completion of an async operation.
16///
17/// Use [`response_var`] to init.
18pub type ResponderVar<T> = ArcVar<Response<T>>;
19
20/// Variable used to listen to a one time signal that an async operation has completed.
21///
22/// Use [`response_var`] or [`response_done_var`] to init.
23pub type ResponseVar<T> = types::ReadOnlyVar<Response<T>, ArcVar<Response<T>>>;
24/// Raw value in a [`ResponseVar`].
25#[derive(Clone, Copy, PartialEq)]
26pub enum Response<T: VarValue> {
27    /// Responder has not set the response yet.
28    Waiting,
29    /// Responder has set the response.
30    Done(T),
31}
32impl<T: VarValue> Response<T> {
33    /// Has response.
34    pub fn is_done(&self) -> bool {
35        matches!(self, Response::Done(_))
36    }
37
38    /// Does not have response.
39    pub fn is_waiting(&self) -> bool {
40        matches!(self, Response::Waiting)
41    }
42
43    /// Gets the response if done.
44    pub fn done(&self) -> Option<&T> {
45        match self {
46            Response::Waiting => None,
47            Response::Done(r) => Some(r),
48        }
49    }
50}
51impl<T: VarValue> fmt::Debug for Response<T> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        if f.alternate() {
54            match self {
55                Response::Waiting => {
56                    write!(f, "Response::Waiting")
57                }
58                Response::Done(v) => f.debug_tuple("Response::Done").field(v).finish(),
59            }
60        } else {
61            match self {
62                Response::Waiting => {
63                    write!(f, "Waiting")
64                }
65                Response::Done(v) => fmt::Debug::fmt(v, f),
66            }
67        }
68    }
69}
70impl<T: VarValue> From<Response<T>> for Option<T> {
71    fn from(value: Response<T>) -> Self {
72        match value {
73            Response::Waiting => None,
74            Response::Done(r) => Some(r),
75        }
76    }
77}
78
79impl<T: VarValue> ResponseVar<T> {
80    /// Visit the response, if present.
81    pub fn with_rsp<R>(&self, read: impl FnOnce(&T) -> R) -> Option<R> {
82        self.with(|value| match value {
83            Response::Waiting => None,
84            Response::Done(value) => Some(read(value)),
85        })
86    }
87
88    /// Visit the response, if present and new.
89    pub fn with_new_rsp<R>(&self, read: impl FnOnce(&T) -> R) -> Option<R> {
90        self.with_new(|value| match value {
91            Response::Waiting => None,
92            Response::Done(value) => Some(read(value)),
93        })
94        .flatten()
95    }
96
97    /// If the response is received.
98    pub fn is_done(&self) -> bool {
99        self.with(Response::is_done)
100    }
101
102    /// If the response is not received yet.
103    pub fn is_waiting(&self) -> bool {
104        self.with(Response::is_waiting)
105    }
106
107    /// Clone the response value, if present.
108    pub fn rsp(&self) -> Option<T> {
109        self.with_rsp(Clone::clone)
110    }
111
112    /// Returns a future that awaits until a response is received and then returns a clone.
113    pub async fn wait_rsp(&self) -> T {
114        self.wait_done().await;
115        self.rsp().unwrap()
116    }
117
118    /// Returns a future that awaits until a response is received and then returns it.
119    ///
120    /// Will clone the value if the variable is shared.
121    ///
122    /// Note that `ResponseVar<T>` implements [`IntoFuture`] so you can also just `.await` the variable.
123    pub async fn wait_into_rsp(self) -> T {
124        self.wait_done().await;
125        self.into_rsp().unwrap()
126    }
127
128    /// Returns a future that awaits until a response is received.
129    ///
130    /// [`rsp`]: Self::rsp
131    pub async fn wait_done(&self) {
132        self.wait_value(Response::is_done).await;
133    }
134
135    /// Clone the response, if present and new.
136    pub fn rsp_new(&self) -> Option<T> {
137        self.with_new_rsp(Clone::clone)
138    }
139
140    /// Into response, if received.
141    ///
142    /// Clones if the variable is has more than one strong reference.
143    pub fn into_rsp(self) -> Option<T> {
144        self.into_value().into()
145    }
146
147    /// Map the response value using `map`, if the variable is awaiting a response uses the `waiting_value` first.
148    pub fn map_rsp<O, I, M>(&self, waiting_value: I, map: M) -> impl Var<O>
149    where
150        O: VarValue,
151        I: Fn() -> O + Send + Sync + 'static,
152        M: FnOnce(&T) -> O + Send + 'static,
153    {
154        let mut map = Some(map);
155        self.filter_map(
156            move |r| match r {
157                Response::Waiting => None,
158                Response::Done(r) => map.take().map(|m| m(r)),
159            },
160            waiting_value,
161        )
162    }
163
164    /// Map to another response variable.
165    pub fn map_response<O, M>(&self, mut map: M) -> ResponseVar<O>
166    where
167        O: VarValue,
168        M: FnMut(&T) -> O + Send + 'static,
169    {
170        self.map(move |r| match r {
171            Response::Waiting => Response::Waiting,
172            Response::Done(t) => Response::Done(map(t)),
173        })
174    }
175}
176impl<T: VarValue> IntoFuture for ResponseVar<T> {
177    type Output = T;
178
179    // refactor after 'impl_trait_in_assoc_type' is stable
180    type IntoFuture = std::pin::Pin<Box<dyn Future<Output = T> + Send + Sync>>;
181
182    fn into_future(self) -> Self::IntoFuture {
183        Box::pin(self.wait_into_rsp())
184    }
185}
186
187impl<T: VarValue> ResponderVar<T> {
188    /// Sets the one time response.
189    pub fn respond(&self, response: T) {
190        self.set(Response::Done(response));
191    }
192
193    /// Creates a [`ResponseVar`] linked to this responder.
194    pub fn response_var(&self) -> ResponseVar<T> {
195        self.read_only()
196    }
197}