0 | ||| Utilities for working with work loops.
  1 | module IO.Async.Loop
  2 |
  3 | import Data.Nat
  4 | import Data.Linear.Deferred
  5 | import Data.Linear.Unique
  6 |
  7 | import public IO.Async.Core
  8 | import public Data.Linear.Ref1
  9 | import Syntax.T1
 10 |
 11 | %default total
 12 |
 13 | --------------------------------------------------------------------------------
 14 | -- Fiber Implementation
 15 | --------------------------------------------------------------------------------
 16 |
 17 | record FiberImpl (es : List Type) (a : Type) where
 18 |   constructor FI
 19 |   ||| Unique identifier of the fiber
 20 |   token  : IOToken
 21 |
 22 |   ||| Set, if the fiber has been canceled.
 23 |   cncl   : Once World ()
 24 |
 25 |   ||| Set, if the fiber has run to completion.
 26 |   res    : Deferred World (Outcome es a)
 27 |
 28 | -- allocates a new fiber
 29 | newFiber : IO1 (FiberImpl es a)
 30 | newFiber t =
 31 |   let tok  # t := Unique.token1 t
 32 |       cncl # t := onceOf1 () t
 33 |       res  # t := deferredOf1 (Outcome es a) t
 34 |    in FI tok cncl res # t
 35 |
 36 | toFiber : FiberImpl es a -> Fiber es a
 37 | toFiber fbr = MkFiber (putOnce1 fbr.cncl ()) (observeDeferredAs1 fbr.res)
 38 |
 39 | --------------------------------------------------------------------------------
 40 | -- Running Fiber State
 41 | --------------------------------------------------------------------------------
 42 |
 43 | -- An item on the call stack of a running fiber. See `Stack`.
 44 | data StackItem : (e : Type) -> (es,fs : List Type) -> (a,b : Type) -> Type where 
 45 |   -- A monadic continuation. This is put on the call stack whenever we
 46 |   -- encounter the `Bind` data constructor.
 47 |   Bnd   : (a -> Async e es b) -> StackItem e es es a b
 48 |
 49 |   -- Error handling. This is put on the call stack whenever we encounter
 50 |   -- the `Attempt` data constructor.
 51 |   Att   : StackItem e es fs a (Result es a)
 52 |
 53 |   -- Instruction to increase the cancelation mask by one.
 54 |   Inc   : StackItem e es es a a
 55 |
 56 |   -- Utility to inform us that the computation is finished here.
 57 |   Abort : StackItem e [] es () a
 58 |
 59 |   -- Instruction to decrease the cancelation mask by one.
 60 |   Dec   : StackItem e es es a a
 61 |
 62 |   -- A cancelation hook. This will be ignored if the fiber is
 63 |   -- currently not canceled or cancelation cannot be observed.
 64 |   Hook  : Async e [] () -> StackItem e es es a a
 65 |
 66 | -- Properly typed stack of nested `Bind`s plus instructions
 67 | -- related to cancelation and masking of a running fiber.
 68 | --
 69 | -- While building and consuming our own call stack comes with a certain
 70 | -- overhead, this overhead is typically small compared to the cost
 71 | -- associated with spawning fibers, performing system calls, or
 72 | -- handling asynchronous boundaries.
 73 | data Stack : (e : Type) -> (es,fs : List Type) -> (a,b : Type) -> Type where
 74 |   Nil  : Stack e es es a a
 75 |   (::) : StackItem e es fs a b -> Stack e fs gs b c -> Stack e es gs a c
 76 |
 77 | ||| Internal state of a running fiber.
 78 | export
 79 | record FbrState (e : Type) where
 80 |   constructor FST
 81 |   {0 curErrs, resErrs : List Type}
 82 |   {0 curType, resType : Type}
 83 |
 84 |   fiber : FiberImpl resErrs resType -- (mutable) state of the running fiber
 85 |   mask  : Nat -- cancellation mask
 86 |   comp  : Async e curErrs curType -- current computation
 87 |   stack : Stack e curErrs resErrs curType resType -- computation stack
 88 |
 89 | ||| Result of (partially) evaluate a fiber.
 90 | public export
 91 | data RunRes : Type -> Type where
 92 |   ||| The fiber has terminated with an `Outcome`, or we arrived at
 93 |   ||| an asynchronous boundary (`Asnc` data constructor) and the
 94 |   ||| fiber should be parked until a result is ready.
 95 |   Done : RunRes e
 96 |
 97 |   ||| Evaluation of the fiber is not yet finished, but should be
 98 |   ||| rescheduled by moving the fiber at the end of the event loop's
 99 |   ||| work queue. This happens a) after a certain number of evaluation
100 |   ||| steps, or b) when `cede` is encountered.
101 |   Cont : FbrState e -> RunRes e
102 |
103 | ||| A context for submitting and running work packages asynchronously.
104 | |||
105 | ||| The basic functionality of an event loop is to allow us to spawn
106 | ||| new work packages, all of which will then be run concurrently (but not
107 | ||| necessarily in parallel), and to `cede` a running computation, so that
108 | ||| it will be processed later while allowing other work packages to be
109 | ||| processed first.
110 | |||
111 | ||| In addition, an event loop can support arbitrary additional effects, for
112 | ||| instance, the ability to setup timers, signal handlers, and asynchronous
113 | ||| `IO` actions. These additional capabilities are represented by type
114 | ||| parameter `e`, representing the event loop currently processing a work
115 | ||| package.
116 | public export
117 | interface EventLoop (0 e : Type) where
118 |   constructor EL
119 |   ||| Submits a fiber to be run by event loop `el`
120 |   spawn : (el : e) -> FbrState e -> IO1 ()
121 |
122 |   ||| Number of evaluation steps before a fiber should be rescheduled.
123 |   limit : Nat
124 |
125 | export
126 | runFbr : (el : EventLoop e) => e -> FbrState e -> IO1 (RunRes e)
127 |
128 | export
129 | runAsyncWith :
130 |      {auto el : EventLoop e}
131 |   -> e
132 |   -> Async e es a
133 |   -> (Outcome es a -> IO ())
134 |   -> IO ()
135 | runAsyncWith env act cb = runIO $ \t =>
136 |   let fbr # t := newFiber t
137 |       _   # t := observeDeferredAs1 fbr.res fbr.token (\o => ioToF1 $ cb o) t
138 |    in spawn env (FST fbr 0 act []) t
139 |
140 | export %inline
141 | runAsync : EventLoop e => e -> Async e es a -> IO ()
142 | runAsync env as = runAsyncWith env as (\_ => pure ())
143 |
144 | --------------------------------------------------------------------------------
145 | -- Async Runner (Here be Dragons)
146 | --------------------------------------------------------------------------------
147 |
148 | record CBState (es : List Type) (a : Type) where
149 |   constructor CST
150 |   {0 resErrs : List Type}
151 |   {0 envType, resType : Type}
152 |
153 |   env      : envType
154 |   cnclCB   : IO1 ()
155 |   cnclCncl : IO1 ()
156 |   fiber    : FiberImpl resErrs resType
157 |   mask     : Nat -- cancellation mask
158 |   stack    : Stack envType es resErrs a resType
159 |   {auto el : EventLoop envType}
160 |
161 | prepend : Async e [] a -> Stack e [] fs a b -> Stack e [] fs () b
162 | prepend act s = Bnd (const act) :: s
163 |
164 | hooks : Stack e es fs a b -> Stack e [] fs () b
165 | hooks (Hook h :: t) = prepend h (hooks t)
166 | hooks (_ :: t)      = hooks t
167 | hooks []            = [Abort]
168 |
169 | observeCancel : Once World (Outcome es a) -> Nat -> FiberImpl fs b -> IO1 (IO1 ())
170 | observeCancel o 0 f = observeOnce1 f.cncl (\_ => putOnce1 o Canceled)
171 | observeCancel _ _ _ = (unit1 #)
172 |
173 | -- a fiber that has already completed with the given result.
174 | synchronous : Outcome es a -> Fiber es a
175 | synchronous o = MkFiber unit1 (\_,cb,t => let _ # t := cb o t in unit1 # t)
176 |
177 | -- a fiber from an asynchronous computation.
178 | %noinline
179 | asynchronous : ((Result es a -> IO1 ()) -> IO1 (IO1 ())) -> IO1 (Fiber es a)
180 | asynchronous install t =
181 |   let def     # t := deferredOf1 (Outcome es a) t
182 |       cleanup # t := install (putDeferred1 def . toOutcome) t
183 |       cncl        := T1.do cleanupputDeferred1 def Canceled
184 |    in MkFiber cncl (observeDeferredAs1 def) # t
185 |
186 | %noinline
187 | spawnCB : CBState es a -> Outcome es a -> IO1 ()
188 | spawnCB (CST env c1 c2 fbr cm st) o t =
189 |   case o of
190 |     Succeeded r => let _ # t := c2 t in spawn env (FST fbr cm (Val r) st) t
191 |     Error     x => let _ # t := c2 t in spawn env (FST fbr cm (Err x) st) t
192 |     Canceled    => let _ # t := c1 t in spawn env (FST fbr 1  (Val ()) (hooks st)) t
193 |
194 | %noinline
195 | writeOnCB :
196 |      Once World (Outcome es a)
197 |   -> ((Result es a -> IO1 ()) -> IO1 (IO1 ()))
198 |   -> IO1 (IO1 ())
199 | writeOnCB o f t = f (putOnce1 o . toOutcome) t
200 |
201 | %noinline
202 | obsOnce : Once World (Outcome es a) -> CBState es a -> IO1 (RunRes e)
203 | obsOnce o st t = let _ # t := observeOnce1 o (spawnCB st) t in Done # t
204 |
205 | -- Finalize the fiber with the given outcome and call all its observers.
206 | %inline
207 | finalize : FiberImpl es a -> Outcome es a -> IO1 (RunRes e)
208 | finalize fbr o t = let _ # t := putDeferred1 fbr.res o t in Done # t
209 |
210 | -- Invokes runR or runC depending on if the fiber has
211 | -- been canceled and cancelation is currently observable
212 | run :
213 |      {auto el : EventLoop e}
214 |   -> (env : e)           -- the event loop on which the fiber runs
215 |   -> Async e es a        -- next computation step of the running fiber
216 |   -> (cancelMask  : Nat) -- 0 if cancelation can be observed > 0 otherwise
217 |   -> (cedeCount   : Nat) -- if at 0, the fiber will be rescheduled
218 |   -> FiberImpl fs b      -- mutable state of the running fiber
219 |   -> Stack e es fs a b   -- call stack of the running fiber
220 |   -> IO1 (RunRes e)
221 |
222 | -- evaluates an alive fiber: one that has not been canceled or
223 | -- for which cancelation can currently not be observed
224 | runR :
225 |      {auto el : EventLoop e}
226 |   -> (env : e)
227 |   -> Async e es a
228 |   -> (cancelMask  : Nat)
229 |   -> (cedeCount   : Nat)
230 |   -> FiberImpl fs b
231 |   -> Stack e es fs a b
232 |   -> IO1 (RunRes e)
233 |
234 | -- runs a canceled fiber
235 | -- we no longer need a `cancelMask` argument, because all we
236 | -- are going to do now is extract and run the cancellation hooks
237 | runC :
238 |      {auto el : EventLoop e}
239 |   -> (env : e)
240 |   -> Async e es a
241 |   -> (cedeCount : Nat)
242 |   -> FiberImpl fs b
243 |   -> Stack e es fs a b
244 |   -> IO1 (RunRes e)
245 |
246 | -- the cede count arrived at 0 so we stop and allow the fiber
247 | -- to be rescheduled on the event loop
248 | run env act cm 0     fbr st t = Cont (FST fbr cm act st) # t
249 |
250 | -- the cancel mask is at 0 so cancelation can currently be observed
251 | -- we check if the fiber has been canceled and either invoke
252 | -- `runC` or `runR`
253 | run env act 0  (S k) fbr st t =
254 |   case completedOnce1 fbr.cncl t of
255 |     False # t => runR env act 0 k fbr st t
256 |     True  # t => runC env act k fbr st t
257 |
258 | -- cancellation can currently not be observed so there is no
259 | -- point in checking if the fiber has been canceled.
260 | run env act c  (S k) fbr st t = runR env act c k fbr st t
261 |
262 | runC env act cc fbr st t =
263 |   case act of
264 |     UC f   => run env (f fbr.token 1) 1 cc fbr (Dec::st) t
265 |     Val x => case st of
266 |       Bnd f :: tl => case f x of
267 |         UC g => run env (g fbr.token 1) 1 cc fbr (Dec::tl) t
268 |         a    => run env (pure ()) 1 cc fbr (hooks st) t
269 |       Att :: tl   => runC env (Val $ Right x) cc fbr tl t
270 |       Inc :: tl   => run env (Val x) 1 cc fbr tl t
271 |       _           => run env (pure ()) 1 cc fbr (hooks st) t
272 |     Err x => case st of
273 |       Att :: tl => runC env (Val $ Left x) cc fbr tl t
274 |       Inc :: tl => run env (Err x) 1 cc fbr tl t
275 |       _         => run env (pure ()) 1 cc fbr (hooks st) t
276 |     _    => run env (pure ()) 1 cc fbr (hooks st) t
277 |
278 | runR env act cm cc fbr st t =
279 |   case act of
280 |     Bind x f => case x of
281 |       Val x => run env (f x) cm cc fbr st t
282 |       Self  => run env (f fbr.token) cm cc fbr st t
283 |       _     => run env x cm cc fbr (Bnd f :: st) t
284 |
285 |     Val x      => case st of
286 |       Bnd f  :: tl => run env (f x) cm        cc fbr tl t
287 |       Inc    :: tl => run env act   (S cm)    cc fbr tl t
288 |       Dec    :: tl => run env act   (pred cm) cc fbr tl t
289 |       -- ignore cancel hook because cancelation is currently not
290 |       -- observable.
291 |       Hook h :: tl => run env act   cm        cc fbr tl t
292 |       Abort  :: tl => finalize fbr Canceled t
293 |       Att    :: tl => run env (Val $ Right x) cm cc fbr tl t
294 |       []          => finalize fbr (Succeeded x) t
295 |
296 |     Err x      => case st of
297 |       Att    :: tl => run env (Val $ Left x) cm cc fbr tl t
298 |       Bnd _  :: tl => run env (Err x)        cm cc fbr tl t
299 |       Inc    :: tl => run env act   (S cm)      cc fbr tl t
300 |       Dec    :: tl => run env act   (pred cm)   cc fbr tl t
301 |       -- ignore cancel hook because cancelation is currently not
302 |       -- observable.
303 |       Hook h :: tl => run env act   cm        cc fbr tl t
304 |       Abort  :: tl => finalize fbr Canceled t
305 |       []          => finalize fbr (Error x) t
306 |
307 |     -- For certain fibers it is not necessary to actually spawn them
308 |     -- on the event loop, so we optimize those away.
309 |     Start x     => case x of
310 |       Asnc reg =>
311 |         let f2 # t := asynchronous reg t
312 |          in run env (Val f2) cm cc fbr st t
313 |       Cancel => run env (Val $ synchronous Canceled) cm cc fbr st t
314 |       Val v  => run env (Val $ synchronous (Succeeded v)) cm cc fbr st t
315 |       Err x  => run env (Val $ synchronous (Error x)) cm cc fbr st t
316 |       Self   => run env (Val $ synchronous (Succeeded fbr.token)) cm cc fbr st t
317 |       _ =>
318 |         let fbr2 # t := newFiber t
319 |             _    # t := spawn env (FST fbr2 0 x []) t
320 |          in run env (Val $ toFiber fbr2) cm cc fbr st t
321 |
322 |     Sync x      =>
323 |       let r # t := ioToF1 x t
324 |        in run env (terminal r) cm cc fbr st t
325 |
326 |     Attempt x => run env x cm cc fbr (Att :: st) t
327 |
328 |     Cancel      => 
329 |       let _ # t := putOnce1 fbr.cncl () t
330 |        in run env (Val ()) cm cc fbr st t
331 |
332 |     OnCncl x y  => run env x cm cc fbr (Hook y :: st) t
333 |
334 |     UC f        => run env (f fbr.token (S cm)) (S cm) cc fbr (Dec::st) t
335 |
336 |     Env         => run env (Val env) cm cc fbr st t
337 |
338 |     Cede        => Cont (FST fbr cm (Val ()) st) # t
339 |
340 |     Self        => run env (Val fbr.token) cm cc fbr st t
341 |
342 |     Asnc f =>
343 |       let o  # t := onceOf1 (Outcome es a) t
344 |           c1 # t := writeOnCB o f t
345 |           c2 # t := observeCancel o cm fbr t
346 |        in case peekOnce1 o t of
347 |             Nothing  # t => obsOnce o (CST env c1 c2 fbr cm st) t
348 |             Just out # t => case out of
349 |               Succeeded r => let _ # t := c2 t in run env (Val r) cm cc fbr st t
350 |               Error     x => let _ # t := c2 t in run env (Err x) cm cc fbr st t
351 |               Canceled    => let _ # t := c1 t in run env (pure ()) 1 cc fbr (hooks st) t
352 |
353 |     APoll tok k x => case tok == fbr.token && k == cm of
354 |       True  => run env x (pred cm) cc fbr (Inc :: st) t
355 |       False => run env x cm        cc fbr st t
356 |
357 | runFbr env (FST fbr msk act st) = run env act msk (limit @{el}) fbr st
358 |