zng_var/var_impl/
response_var.rs1use crate::{AnyVar, Var};
4
5use super::*;
6
7pub fn response_var<T: VarValue>() -> (ResponderVar<T>, ResponseVar<T>) {
9 let responder = var(Response::Waiting::<T>);
10 let response = responder.read_only();
11 (ResponderVar(responder), ResponseVar(response))
12}
13
14pub fn response_done_var<T: VarValue>(response: T) -> ResponseVar<T> {
16 ResponseVar(var(Response::Done(response)).read_only())
17}
18
19#[derive(Clone)]
23pub struct ResponderVar<T: VarValue>(Var<Response<T>>);
24
25#[derive(Clone)]
29pub struct ResponseVar<T: VarValue>(Var<Response<T>>);
30
31impl<T: VarValue> ops::Deref for ResponderVar<T> {
32 type Target = Var<Response<T>>;
33
34 fn deref(&self) -> &Self::Target {
35 &self.0
36 }
37}
38impl<T: VarValue> IntoVar<Response<T>> for ResponderVar<T> {
39 fn into_var(self) -> Var<Response<T>> {
40 self.0
41 }
42}
43impl<T: VarValue> From<ResponderVar<T>> for Var<Response<T>> {
44 fn from(var: ResponderVar<T>) -> Self {
45 var.0
46 }
47}
48impl<T: VarValue> From<ResponderVar<T>> for AnyVar {
49 fn from(var: ResponderVar<T>) -> Self {
50 var.0.into()
51 }
52}
53
54impl<T: VarValue> ops::Deref for ResponseVar<T> {
55 type Target = Var<Response<T>>;
56
57 fn deref(&self) -> &Self::Target {
58 &self.0
59 }
60}
61impl<T: VarValue> IntoVar<Response<T>> for ResponseVar<T> {
62 fn into_var(self) -> Var<Response<T>> {
63 self.0
64 }
65}
66impl<T: VarValue> From<ResponseVar<T>> for Var<Response<T>> {
67 fn from(var: ResponseVar<T>) -> Self {
68 var.0
69 }
70}
71impl<T: VarValue> From<ResponseVar<T>> for AnyVar {
72 fn from(var: ResponseVar<T>) -> Self {
73 var.0.into()
74 }
75}
76impl<T: VarValue> From<Var<Response<T>>> for ResponderVar<T> {
77 fn from(var: Var<Response<T>>) -> Self {
78 if var.capabilities().is_always_read_only() {
79 tracing::error!("creating `ResponderVar` from `is_always_read_only` var");
80 }
81 ResponderVar(var)
82 }
83}
84impl<T: VarValue> From<Var<Response<T>>> for ResponseVar<T> {
85 fn from(var: Var<Response<T>>) -> Self {
86 ResponseVar(var.read_only())
87 }
88}
89
90#[derive(Clone, Copy, PartialEq)]
92pub enum Response<T: VarValue> {
93 Waiting,
95 Done(T),
97}
98impl<T: VarValue> Response<T> {
99 pub fn is_done(&self) -> bool {
101 matches!(self, Response::Done(_))
102 }
103
104 pub fn is_waiting(&self) -> bool {
106 matches!(self, Response::Waiting)
107 }
108
109 pub fn done(&self) -> Option<&T> {
111 match self {
112 Response::Waiting => None,
113 Response::Done(r) => Some(r),
114 }
115 }
116}
117impl<T: VarValue> fmt::Debug for Response<T> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 if f.alternate() {
120 match self {
121 Response::Waiting => {
122 write!(f, "Response::Waiting")
123 }
124 Response::Done(v) => f.debug_tuple("Response::Done").field(v).finish(),
125 }
126 } else {
127 match self {
128 Response::Waiting => {
129 write!(f, "Waiting")
130 }
131 Response::Done(v) => fmt::Debug::fmt(v, f),
132 }
133 }
134 }
135}
136impl<T: VarValue> From<Response<T>> for Option<T> {
137 fn from(value: Response<T>) -> Self {
138 match value {
139 Response::Waiting => None,
140 Response::Done(r) => Some(r),
141 }
142 }
143}
144impl<T: VarValue> From<Response<Option<T>>> for Option<T> {
145 fn from(value: Response<Option<T>>) -> Self {
146 match value {
147 Response::Waiting => None,
148 Response::Done(r) => r,
149 }
150 }
151}
152
153impl<T: VarValue> ResponseVar<T> {
154 pub fn with_rsp<R>(&self, read: impl FnOnce(&T) -> R) -> Option<R> {
156 self.with(|value| match value {
157 Response::Waiting => None,
158 Response::Done(value) => Some(read(value)),
159 })
160 }
161
162 pub fn with_new_rsp<R>(&self, read: impl FnOnce(&T) -> R) -> Option<R> {
164 self.with_new(|value| match value {
165 Response::Waiting => None,
166 Response::Done(value) => Some(read(value)),
167 })
168 .flatten()
169 }
170
171 pub fn is_done(&self) -> bool {
173 self.with(Response::is_done)
174 }
175
176 pub fn is_waiting(&self) -> bool {
178 self.with(Response::is_waiting)
179 }
180
181 pub fn rsp(&self) -> Option<T> {
183 self.with_rsp(Clone::clone)
184 }
185
186 pub async fn wait_rsp(&self) -> T {
188 self.wait_done().await;
189 self.rsp().unwrap()
190 }
191
192 pub async fn wait_done(&self) {
196 self.wait_match(Response::is_done).await;
197 }
198
199 pub fn rsp_new(&self) -> Option<T> {
201 self.with_new_rsp(Clone::clone)
202 }
203
204 pub fn map_rsp<O, I, M>(&self, waiting_value: I, map: M) -> Var<O>
206 where
207 O: VarValue,
208 I: Fn() -> O + Send + Sync + 'static,
209 M: FnOnce(&T) -> O + Send + 'static,
210 {
211 let mut map = Some(map);
212 self.filter_map(
213 move |r| match r {
214 Response::Waiting => None,
215 Response::Done(r) => map.take().map(|m| m(r)),
216 },
217 waiting_value,
218 )
219 }
220
221 pub fn map_response<O, M>(&self, mut map: M) -> ResponseVar<O>
223 where
224 O: VarValue,
225 M: FnMut(&T) -> O + Send + 'static,
226 {
227 ResponseVar(self.map(move |r| match r {
228 Response::Waiting => Response::Waiting,
229 Response::Done(t) => Response::Done(map(t)),
230 }))
231 }
232}
233impl<T: VarValue> IntoFuture for ResponseVar<T> {
234 type Output = T;
235
236 type IntoFuture = std::pin::Pin<Box<dyn Future<Output = T> + Send + Sync>>;
238
239 fn into_future(self) -> Self::IntoFuture {
240 Box::pin(async move { self.wait_rsp().await })
241 }
242}
243
244impl<T: VarValue> ResponderVar<T> {
245 pub fn respond(&self, response: T) {
247 self.set(Response::Done(response));
248 }
249
250 pub fn response_var(&self) -> ResponseVar<T> {
252 ResponseVar(self.read_only())
253 }
254}