0 | module FS.Scope
  1 |
  2 | import Control.Monad.Elin
  3 |
  4 | import Data.Linear.Deferred
  5 | import Data.Linear.ELift1
  6 | import Data.Linear.Ref1
  7 | import Data.Linear.Traverse1
  8 | import Data.Linear.Unique
  9 | import Data.List
 10 | import Data.Maybe
 11 | import Data.Nat
 12 | import Data.SortedMap
 13 | import Data.SortedSet
 14 |
 15 | import IO.Async
 16 |
 17 | %default total
 18 |
 19 | --------------------------------------------------------------------------------
 20 | -- Hook
 21 | --------------------------------------------------------------------------------
 22 |
 23 | record Hook (f : List Type -> Type -> Type) where
 24 |   constructor H
 25 |   cleanup : f [] ()
 26 |   lease   : f [] (f [] ())
 27 |
 28 | updCleanup, updUnlease : (Bool,Nat) -> ((Bool,Nat),Bool)
 29 | updCleanup (False,b) = ((True,b),b == 0)
 30 | updCleanup p         = (p,False)
 31 |
 32 | updUnlease (b,1) = ((b,0),b)
 33 | updUnlease (b,n) = ((b,pred n),False)
 34 |
 35 | updLease : (Bool,Nat) -> ((Bool,Nat),())
 36 | updLease (b,n) = ((b,S n),())
 37 |
 38 | hook : ELift1 s f => f [] () -> F1 s (Hook f)
 39 | hook act t =
 40 |  let state # t := ref1 (False,Z) t -- (cleaned up, leased)
 41 |   in H (cl state) (ls state) # t
 42 |
 43 |   where
 44 |     cl : Ref s (Bool,Nat) -> f [] ()
 45 |     cl state = when !(update state updCleanup) act
 46 |
 47 |     unlease : Ref s (Bool,Nat) -> f [] ()
 48 |     unlease state = when !(update state updUnlease) act
 49 |
 50 |     ls : Ref s (Bool,Nat) -> f [] (f [] ())
 51 |     ls state = update state updLease $> unlease state
 52 |
 53 | --------------------------------------------------------------------------------
 54 | -- Interrupt
 55 | --------------------------------------------------------------------------------
 56 |
 57 | ||| An interruption handler.
 58 | |||
 59 | ||| This is used for cross-stream interruption when running streams in
 60 | ||| parallel. For instance, when nondeterministically merging two streams
 61 | ||| A and B (see `FS.Concurrent.merge`), A and B will be run in parallel
 62 | ||| each on its own fiber. Both should be interrupted if downstream terminates,
 63 | ||| for instance because the number of emitted values have been limited via
 64 | ||| a call to `take`.
 65 | |||
 66 | ||| Implementation detail: When running a stream, we check on each iteration
 67 | ||| whether the current scope has been interrupted or not. In addition, in
 68 | ||| case of wrapped effectful computations - which might be potentially long
 69 | ||| running (think of a timer or waiting for a connection or a mouse click) -
 70 | ||| the wrapped effect is raced against the stream being interrupted.
 71 | public export
 72 | data Interrupt : (f : List Type -> Type -> Type) -> Type where
 73 |   None : Interrupt f
 74 |   I    : (def : Deferred World a) -> Interrupt (Async e)
 75 |
 76 | --------------------------------------------------------------------------------
 77 | -- Compilation Targets
 78 | --------------------------------------------------------------------------------
 79 |
 80 | ||| Target effect of stream compilation.
 81 | |||
 82 | ||| Effect type `f` (of type `List Type -> Type -> Type`) can be used to
 83 | ||| run (= evaluate) streams in state thread `s`.
 84 | ||| Currently, this is either `Elin s` for running synchronous streams in state
 85 | ||| effect `s`, or `Async e` for running streams concurrently.
 86 | |||
 87 | ||| If `s` is universally quantified, `Elin s` streams can be converted to pure
 88 | ||| functions making use of local mutable state, resource management, and error
 89 | ||| handling. If `s` equals `World`, `Elin World` can be used as a regular
 90 | ||| (synchronous) monad with error handling.
 91 | public export
 92 | interface ELift1 s f => Target s f | f where
 93 |   ||| Combines two interruption handlers
 94 |   combineInterrupts : (x,y : Interrupt f) -> F1 s (Interrupt f, List (f [] ()))
 95 |
 96 |   ||| Returns `True` if the stream has been interrupted.
 97 |   isInterrupted : Interrupt f -> f [] Bool
 98 |
 99 |   ||| Races an effectful computation against stream interruption
100 |   raceInterrupt : Interrupt f -> f es a -> f [] (Outcome es a)
101 |
102 | export %inline
103 | Target s (Elin s) where
104 |   combineInterrupts None None t = (None, []) # t
105 |   isInterrupted _ = pure False
106 |   raceInterrupt _ = map toOutcome . attempt
107 |
108 | export
109 | Target World (Async e) where
110 |   combineInterrupts None   x      t = (x, []) # t
111 |   combineInterrupts x      None   t = (x, []) # t
112 |   combineInterrupts (I d1) (I d2) t =
113 |     let d3 # t := deferredOf1 () t
114 |         f1 # t := observeDeferred1 d1 (\_ => putDeferred1 d3 ()) t
115 |         f2 # t := observeDeferred1 d2 (\_ => putDeferred1 d3 ()) t
116 |      in (I d3, [lift1 f1, lift1 f2]) # t
117 |
118 |   isInterrupted None  = pure False
119 |   isInterrupted (I d) = completed d
120 |
121 |   raceInterrupt None  act = toOutcome <$> attempt act
122 |   raceInterrupt (I d) act =
123 |     racePair {fs = []} act (await d) >>= \case
124 |       Left  (o,x) => cancel x $> o
125 |       Right (x,_) => cancel x $> Canceled
126 |
127 | --------------------------------------------------------------------------------
128 | -- Scopes
129 | --------------------------------------------------------------------------------
130 |
131 | ||| IDs for comparing and ordering scopes. This is for internal
132 | ||| use only. In particular, looking at the internal representation
133 | ||| of a `ScopeID` via `Show` is *not* referentially transparent and
134 | ||| should be used for debugging purposes only.
135 | export
136 | record ScopeID where
137 |   constructor SID
138 |   val : Nat
139 |
140 | export %inline
141 | Eq ScopeID where
142 |   SID x == SID y = x == y
143 |
144 | export %inline
145 | Ord ScopeID where
146 |   compare (SID x) (SID y) = compare x y
147 |
148 | export %inline
149 | Show ScopeID where
150 |   show = show . val
151 |
152 | scopeID : F1 s ScopeID
153 | scopeID t = let tok # t := token1 t in SID (unsafeVal tok) # t
154 |
155 | export
156 | record Node (f : List Type -> Type -> Type)
157 |
158 | ||| State of scopes of a running stream.
159 | public export
160 | 0 ScopeST : (f : List Type -> Type -> Type) -> Type
161 | ScopeST f = SortedMap ScopeID (Node f)
162 |
163 | public export
164 | 0 STRef : (s : Type) -> (f : List Type -> Type -> Type) -> Type
165 | STRef s f = Ref s (ScopeST f)
166 |
167 | ||| Cancelation scopes
168 | |||
169 | ||| Functional streams are evaluated in scopes, which are organized as
170 | ||| a tree, just like stream evaluation can be thought of as a tree:
171 | ||| Sequencing of streams means nesting in the resulting scope tree,
172 | ||| while parallel evaluation (for instance, when zipping or merging
173 | ||| streams) means branching. Once a stream is exhausted,
174 | ||| it's scope is cleaned up and all resources allocated in this scope
175 | ||| are released, including the resources of all child scope.
176 | |||
177 | ||| In addition to (internal) cancelation, streams can be run concurrently,
178 | ||| in which case the can be interrupted by an external event such as
179 | ||| the exhaustion of a timer or the termination of another stream.
180 | ||| At every evaluation step of a stream we check, if the current scope
181 | ||| has been canceled. If this is the case, evaluation of the stream
182 | ||| is aborted.
183 | |||
184 | ||| Just like `Pull`s and `Stream`s, a `Scope` is parameterized by its
185 | ||| effect type.
186 | public export
187 | record Scope (f : List Type -> Type -> Type) where
188 |   constructor S
189 |   {0 tstate : Type}
190 |
191 |   ||| this scope's unique identifier
192 |   id        : ScopeID
193 |
194 |   ||| ID of this scope's root scope
195 |   root      : ScopeID
196 |
197 |   ||| parent scopes `([parent, grand-parent, ... , root])`
198 |   ancestors : List ScopeID
199 |
200 |   ||| Handler to check for stream interruption
201 |   interrupt : Interrupt f
202 |
203 |   state     : Ref tstate (ScopeST f)
204 |
205 |   {auto tgt : Target tstate f}
206 |
207 | -- a node in the scope graph
208 | export
209 | record Node (f : List Type -> Type -> Type) where
210 |   constructor N
211 |   ||| The immutable scope state.
212 |   scope : Scope f
213 |
214 |   ||| optional cleanup hook for a resource allocated in this scope
215 |   cleanup   : List (Hook f)
216 |
217 |   ||| list of child scopes
218 |   children  : SortedSet ScopeID
219 |
220 | --------------------------------------------------------------------------------
221 | -- Handling Scopes
222 | --------------------------------------------------------------------------------
223 |
224 | -- Inserts a new scope. In case of the root scope, field
225 | -- `rootChildren` is adjusted.
226 | insertScope : Node f -> ScopeST f -> ScopeST f
227 | insertScope s = insert s.scope.id s
228 |
229 | -- There is always a `root` scope, which is the parent of
230 | -- of all scopes.
231 | getRoot : Target s f => STRef s f -> ScopeID -> ScopeST f -> Node f
232 | getRoot ref id = fromMaybe (N (S id id [] None ref) [] empty) . lookup id
233 |
234 | -- Finds the closest ancestor scope that is still open.
235 | --
236 | -- Note: The `root` scope cannot be fully closed, so this will always
237 | --       return a sope.
238 | openAncestor : ScopeST f -> Scope f -> Node f
239 | openAncestor ss s@(S {}) = go s.ancestors
240 |   where
241 |     go : List ScopeID -> Node f
242 |     go []        = getRoot s.state s.root ss
243 |     go (x :: xs) = fromMaybe (go xs) (lookup x ss)
244 |
245 | -- Returns the given scope if it is still open or its closest ancestor.
246 | --
247 | -- Note: The `root` scope cannot be fully closed, so this will always
248 | --       return a sope.
249 | openSelfOrAncestor : ScopeST f -> Scope f -> Node f
250 | openSelfOrAncestor ss sc =
251 |   fromMaybe (openAncestor ss sc) (lookup sc.id ss)
252 |
253 | removeChild : Bool -> Scope f -> ScopeST f -> ScopeST f
254 | removeChild False sc st = st
255 | removeChild True  sc st =
256 |  let par := openAncestor st sc
257 |   in insertScope ({children $= delete sc.id} par) st
258 |
259 | deleteNode : ScopeST f -> Node f -> ScopeST f
260 | deleteNode m n = delete n.scope.id m
261 |
262 | -- Creates a new root scope
263 | root : Target s f => STRef s f -> F1 s (Scope f)
264 | root ref t =
265 |   let sid # t := scopeID t
266 |       r       := N (S sid sid [] None ref) [] empty
267 |       _   # t := mod1 ref (insertScope r) t
268 |    in r.scope # t
269 |
270 | ||| Opens and returns a new child scope for the given parent
271 | ||| scope.
272 | |||
273 | ||| If the parent scope has already been closed, its closest
274 | ||| open ancestor will be used as the new scope's parent instead.
275 | export
276 | openScope : Target s f =>  STRef s f -> Interrupt f -> Scope f -> F1 s (Scope f)
277 | openScope ref int sc@(S i rt as ir _) t =
278 |  let sid          # t := scopeID t
279 |      (sint, cncl) # t := combineInterrupts ir int t
280 |      hooks        # t := traverse1 hook cncl t
281 |   in casupdate1 ref (\ss =>
282 |       let par  := openSelfOrAncestor ss sc
283 |           ancs := i :: as
284 |           node := N (Scope.S sid rt ancs sint ref) hooks empty
285 |           par2 := {children $= insert sid} par
286 |        in (insertScope par2 $ insertScope node ss, node.scope)) t
287 |
288 | ||| Adds a new cleanup hook to the given scope or its closest
289 | ||| open parent scope.
290 | export %inline
291 | addHook : Scope f -> f [] () -> f [] ()
292 | addHook sc@(S {}) act = do
293 |   hk <- lift1 (hook act)
294 |   Ref1.mod sc.state $ \ss =>
295 |     let res := {cleanup $= (hk ::)} (openSelfOrAncestor ss sc)
296 |      in insertScope res ss
297 |
298 | parameters (ref : STRef s f)
299 |   findNode : ScopeID -> F1 s (Maybe $ Node f)
300 |   findNode x t = let st # t := read1 ref t in lookup x st # t
301 |
302 |   findNodes : List ScopeID -> F1 s (List $ Node f)
303 |   findNodes xs t = let ms # t := traverse1 findNode xs t in catMaybes ms # t
304 |
305 |   childNodesL : List (Node f) -> List (Node f) -> F1 s (List (Node f))
306 |
307 |   childNodes : List (Node f) -> Node f -> F1 s (List (Node f))
308 |   childNodes ns n t =
309 |    let ns2 # t := findNodes (reverse $ Prelude.toList n.children) t
310 |     in childNodesL (n::ns) ns2 t
311 |
312 |   childNodesL ns []      t = ns # t
313 |   childNodesL ns (c::cs) t =
314 |     assert_total $
315 |      let ns2 # t := childNodes ns c t
316 |       in childNodesL ns2 cs t
317 |
318 | parameters {auto tgt : Target s f}
319 |            (ref      : STRef s f)
320 |
321 |   ||| Closes the scope of the given ID plus all its child scopes,
322 |   ||| releasing all allocated resources in reverse order of allocation
323 |   ||| along the way.
324 |   export
325 |   close : (remove : Bool) -> ScopeID -> f [] ()
326 |   close b id = do
327 |     Just n <- lift1 (findNode ref id) | Nothing => pure ()
328 |     cs     <- lift1 (childNodes ref [] n)
329 |     mod ref $ \m => removeChild b n.scope (foldl deleteNode m cs)
330 |     traverse_ cleanup (cs >>= cleanup)
331 |
332 | ||| Leases all cleanup hooks from this scope as well as its direct
333 | ||| children and ancestors.
334 | |||
335 | ||| Invoke the given action to release them again.
336 | export
337 | lease : Scope f -> f [] (f [] ())
338 | lease sc@(S i r as is ref) = do
339 |   Just nd <- lift1 (findNode ref i) | Nothing => pure (pure ())
340 |   ns      <- lift1 (findNodes ref as)
341 |   all     <- lift1 (childNodes ref ns nd)
342 |   cs      <- traverse lease (all >>= cleanup)
343 |   pure (sequence_ cs)
344 |
345 | ||| Creates a new root scope and returns it together with the set of
346 | ||| scopes for the given effect type.
347 | export %inline
348 | newScope : Target s f => f [] (Scope f)
349 | newScope = do
350 |   ref <- newref {a = ScopeST f} empty
351 |   sc  <- lift1 (FS.Scope.root ref)
352 |   pure sc
353 |
354 | --------------------------------------------------------------------------------
355 | -- Debugging Utilities
356 | --------------------------------------------------------------------------------
357 |
358 | export
359 | printScope : Scope f -> String
360 | printScope (S i _ as _ _) = fastConcat . intersperse " <- " . map show $ i::as
361 |
362 | export %inline
363 | Interpolation (Scope f) where interpolate = printScope
364 |
365 | export
366 | logScope : HasIO (f es) => String -> Scope f -> f es ()
367 | logScope nm sc = putStrLn "Scope \{nm}: \{sc}"
368 |