0 | module FS.Concurrent.Signal
  1 |
  2 | import public Data.Linear.Sink
  3 | import Data.Linear.Deferred
  4 | import Data.Linear.Ref1
  5 | import Data.Linear.Traverse1
  6 | import Data.List.Quantifiers.Extra
  7 |
  8 | import FS.Pull
  9 |
 10 | import IO.Async
 11 |
 12 | %default total
 13 |
 14 | --------------------------------------------------------------------------------
 15 | -- Signals
 16 | --------------------------------------------------------------------------------
 17 |
 18 | record ST a where
 19 |   constructor SS
 20 |   value     : a
 21 |   last      : Nat
 22 |   listeners : List (Once World (a,Nat))
 23 |
 24 | %inline
 25 | putImpl : a -> ST a -> (ST a, IO1 ())
 26 | putImpl v (SS _ lst []) = (SS v (S lst) [], (() # ))
 27 | putImpl v (SS _ lst ls) =
 28 |   let n := S lst
 29 |    in (SS v n [], traverse1_ (\o => putOnce1 o (v,n)) ls)
 30 |
 31 | %inline
 32 | updImpl : (a -> (a,b)) -> ST a -> (ST a, IO1 b)
 33 | updImpl f (SS cur lst ls) =
 34 |   let n     := S lst
 35 |       (v,r) := f cur
 36 |    in ( SS v n []
 37 |       , \t => let _ # t := traverse1_ (\o => putOnce1 o (v,n)) ls t in r # t
 38 |       )
 39 |
 40 | %inline
 41 | nextImpl : Nat -> Once World (a,Nat) -> ST a -> (ST a, Async e es (a,Nat))
 42 | nextImpl last once st@(SS v lst ls) =
 43 |   case last == lst of
 44 |     False => (st, pure (v,lst))
 45 |     True  => (SS v lst (once :: ls), awaitOnce once)
 46 |
 47 | export
 48 | record SignalRef a where
 49 |   [noHints]
 50 |   constructor SR
 51 |   ref : Ref World (ST a)
 52 |
 53 | ||| An observable, mutable value
 54 | export
 55 | signal : Lift1 World f => a -> f (SignalRef a)
 56 | signal v = SR <$> newref (SS v 1 [])
 57 |
 58 | ||| Reads the current value of the signal.
 59 | export %inline
 60 | get : Lift1 World f => SignalRef a -> f a
 61 | get (SR ref) = value <$> readref ref
 62 |
 63 | ||| Writes the current value to the signal.
 64 | export
 65 | put1 : SignalRef a -> (v : a) -> IO1 ()
 66 | put1 (SR ref) v t =
 67 |  let act # t := casupdate1 ref (putImpl v) t
 68 |   in act t
 69 |
 70 | ||| Lifted version of `put1`.
 71 | export %inline
 72 | put : Lift1 World f => SignalRef a -> (v : a) -> f ()
 73 | put r = lift1 . put1 r
 74 |
 75 | ||| Updates the value stored in the signal with the given function
 76 | ||| and returns the second result of the computation.
 77 | export
 78 | update1 : SignalRef a -> (g : a -> (a,b)) -> IO1 b
 79 | update1 (SR ref) g t =
 80 |  let act # t := casupdate1 ref (updImpl g) t
 81 |   in act t
 82 |
 83 | ||| Lifted version of `update1`.
 84 | export %inline
 85 | update : Lift1 World f => SignalRef a -> (g : a -> (a,b)) -> f b
 86 | update r = lift1 . update1 r
 87 |
 88 | ||| Updates the value stored in the signal.
 89 | export %inline
 90 | modify1 : SignalRef a -> (g : a -> a) -> IO1 ()
 91 | modify1 s g = update1 s (\v => (g v, ()))
 92 |
 93 | ||| Lifted version of `modify1`.
 94 | export %inline
 95 | modify : Lift1 World f => SignalRef a -> (g : a -> a) -> f ()
 96 | modify r = lift1 . modify1 r
 97 |
 98 | ||| Updates the value stored in the signal and returns the result.
 99 | export %inline
100 | updateAndGet1 : SignalRef a -> (g : a -> a) -> IO1 a
101 | updateAndGet1 s g = update1 s (\v => let w := g v in (w,w))
102 |
103 | ||| Lifted version of `updateAndGet1`.
104 | export %inline
105 | updateAndGet : Lift1 World f => SignalRef a -> (g : a -> a) -> f a
106 | updateAndGet r = lift1 . updateAndGet1 r
107 |
108 | ||| Awaits the next value and its count, potentially blocking the
109 | ||| current fiber if the internal counter is at `current`.
110 | |||
111 | ||| Note: The internal counter starts at `1`, so invoking this with
112 | |||       a count of `0` will always immediately return the internal
113 | |||       value and count.
114 | export
115 | next : SignalRef a -> Nat -> Async e es (a,Nat)
116 | next (SR ref) n = do
117 |   def <- onceOf (a,Nat)
118 |   act <- update ref (nextImpl n def)
119 |   act
120 |
121 | export %inline
122 | signalSink : SignalRef t -> Sink t
123 | signalSink r = S (put1 r)
124 |
125 | public export
126 | interface Reference (0 t : Type -> Type) where
127 |   current1 : t a -> IO1 a
128 |
129 | export %inline
130 | current : LIO f => Reference t => t a -> f a
131 | current = lift1 . current1
132 |
133 | export %inline
134 | Reference IORef where
135 |   current1 = read1
136 |
137 | export %inline
138 | Reference SignalRef where
139 |   current1 = ioToF1 . get
140 |
141 | ||| Creates a continuous stream of values typically by reading
142 | ||| the current state of a mutable reference every time a value is
143 | ||| pulled.
144 | export
145 | continuous : LIO (f es) => Reference t => t a -> Stream f es a
146 | continuous = repeat . eval . current
147 |
148 | public export
149 | interface Discrete (0 t : Type -> Type) where
150 |   ||| Creates a discrete stream of values that reads an observable
151 |   ||| value every time it changes.
152 |   |||
153 |   ||| Note: For `Signal` and `Event`, there is no buffering of values.
154 |   |||       If the they are updated
155 |   |||       more quickly than the stream is being pulled, some values
156 |   |||       might be lost.
157 |   |||
158 |   |||       If you require buffering, use a `IO.Async.BQueue` (single observer)
159 |   |||       or a `IO.Async.Channel` (multiple observers).
160 |   discrete : t a -> Stream (Async e) es a
161 |
162 | export
163 | Discrete SignalRef where
164 |   discrete s = unfoldEval 0 (map (uncurry More) . next s)
165 |
166 | export %inline
167 | Discrete (Once World) where
168 |   discrete = eval . awaitOnce
169 |
170 | export %inline
171 | Discrete (Deferred World) where
172 |   discrete = eval . await
173 |
174 | ||| Like `discrete` but for an initially empty signal.
175 | |||
176 | ||| Fires whenever a `Just` is put into the signal.
177 | export %inline
178 | justs : Discrete t => t (Maybe a) -> Stream (Async e) es a
179 | justs s = discrete s |> catMaybes
180 |
181 | ||| Blocks the fiber and observes the given signal until the given
182 | ||| predicate returns `True`.
183 | export
184 | until : Discrete f => f a -> (a -> Bool) -> Async e [] ()
185 | until ref pred = assert_total $ discrete ref |> any pred |> drain |> mpull
186 |
187 | ||| Observes part of a value in a signal, firing whenever the value changes.
188 | export
189 | onChange : Eq b => Discrete f => f a -> (a -> b) -> Stream (Async e) es b
190 | onChange s f = discrete s |> mapOutput f |> distinct
191 |
192 | --------------------------------------------------------------------------------
193 | -- Writing from Streams to Signals
194 | --------------------------------------------------------------------------------
195 |
196 | parameters {0 f      : List Type -> Type -> Type}
197 |            {0 es     : List Type}
198 |            {0 p,r    : Type}
199 |            {auto lio : LIO (f es)}
200 |            (sig      : SignalRef p)
201 |
202 |   ||| Use the output of a pull to update the value in a signal.
203 |   export
204 |   modSig : (o -> p -> p) -> Pull f o es r -> Pull f o es r
205 |   modSig f = observe (modify sig . f)
206 |
207 |   ||| Use the output of a pull to update the value in a signal.
208 |   export
209 |   setSig : Pull f p es r -> Pull f p es r
210 |   setSig = observe (put sig)
211 |
212 |   ||| Act on the output of a pull by combining it with the current
213 |   ||| value in a signal.
214 |   export
215 |   observeSig : (o -> p -> f es ()) -> Pull f o es r -> Pull f o es r
216 |   observeSig f = observe $ \vo => get sig >>= f vo
217 |
218 |   ||| Like `observeSig` but drains the stream in the process.
219 |   export
220 |   foreachSig : (o -> p -> f es ()) -> Pull f o es r -> Pull f q es r
221 |   foreachSig f = foreach $ \vo => get sig >>= f vo
222 |
223 | --------------------------------------------------------------------------------
224 | -- Mutable
225 | --------------------------------------------------------------------------------
226 |
227 | public export
228 | interface Mutable (0 t : Type -> Type) where
229 |   mutate : {0 a : Type} -> LIO f => t a -> (a -> a) -> f ()
230 |
231 | export %inline
232 | Mutable IORef where mutate = mod
233 |
234 | export %inline
235 | Mutable SignalRef where mutate = modify
236 |
237 | parameters {0 f  : List Type -> Type -> Type}
238 |            {0 es : List Type}
239 |            {0 t  : Type -> Type}
240 |            {auto lio : LIO (f es)}
241 |            {auto mut : Mutable t}
242 |
243 |   export %inline
244 |   modAt : t a -> (o -> a -> a) -> Pull f o es r -> Pull f p es r
245 |   modAt ref fun = foreach (mutate ref . fun)
246 |
247 |   export %inline
248 |   writeTo : t a -> Pull f a es r -> Pull f p es r
249 |   writeTo ref = modAt ref const
250 |
251 |   export %inline
252 |   teeMod : t a -> (o -> a -> a) -> Pull f o es r -> Pull f o es r
253 |   teeMod ref fun = observe (mutate ref . fun)
254 |
255 |   export %inline
256 |   teeTo : t a -> Pull f a es r -> Pull f a es r
257 |   teeTo ref = teeMod ref const
258 |
259 | --------------------------------------------------------------------------------
260 | -- Events
261 | --------------------------------------------------------------------------------
262 |
263 | record EvST a where
264 |   constructor EvSS
265 |   listeners : List (Once World a)
266 |
267 | %inline
268 | evputImpl : a -> EvST a -> (EvST a, IO1 ())
269 | evputImpl v (EvSS ls) = (EvSS [], traverse1_ (\o => putOnce1 o v) ls)
270 |
271 | public export
272 | record Event e es a where
273 |   constructor E
274 |   events    : Stream (Async e) es a
275 |   {auto snk : Sink a}
276 |
277 | nextEv : IORef (EvST a) -> Async e es a
278 | nextEv ref = do
279 |   def <- onceOf a
280 |   act <- mod ref (\(EvSS ls) => EvSS $ def :: ls)
281 |   awaitOnce def
282 |
283 | ||| A discrete stream of values plus a sink for sending such values
284 | ||| to the stream.
285 | export
286 | event : (0 a : Type) -> Async e es (Event e fs a)
287 | event a = Prelude.do
288 |   r <- newref (EvSS {a} [])
289 |   pure $ E
290 |     (repeat $ eval (nextEv r))
291 |     @{S $ \v,t => let f # t := casupdate1 r (evputImpl v) t in f t}
292 |
293 | ||| Like `event` but is already "charged" with an initial value.
294 | export
295 | eventFrom : (ini : a) -> Async e es (Event e fs a)
296 | eventFrom i = {events $= \es => cons i es} <$> event a
297 |